You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 06:58:19 UTC

[GitHub] [kafka] asdaraujo opened a new pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

asdaraujo opened a new pull request #8730:
URL: https://github.com/apache/kafka/pull/8730


   Changed the MM2 checkpoint mirror task to ensure it replicates consumer offsets even when they are equal to zero to avoid issues with consumers after failovers.
   
   Modified the test case to cover the possible scenario of consumer gap, as described on KAFKA-10048. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-664180174


   @mimaison, would you be able to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-683909598


   @mimaison Thanks for the feedback. I've refactored the tests. Could you please give it another review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480267347



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -190,24 +211,19 @@ public void close() {
     public void testReplication() throws InterruptedException {
 
         // create consumers before starting the connectors so we don't need to wait for discovery
-        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1");
+        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "backup.test-topic-1");
         consumer1.poll(Duration.ofMillis(500));
         consumer1.commitSync();
         consumer1.close();
 
-        Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1");
+        Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "primary.test-topic-1");

Review comment:
       Good catch. Also changed this and we're now consuming it only once.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496335557



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -201,26 +213,24 @@ public void close() {
 
     @Test
     public void testReplication() throws InterruptedException {
+        String consumerGroupName = "consumer-group-testReplication";
+        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
+                put("group.id", consumerGroupName);
+                put("auto.offset.reset", "latest");

Review comment:
       Oops, spoke too soon.
   Even though `latest` is Kafka's default, `EmbeddedKafkaCluster::createConsumer` [defaults it to `earliest`](https://github.com/apache/kafka/blob/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L444)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480265451



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedExcept
         time.sleep(5000);
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2");
+        consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "primary.test-topic-1", "primary.test-topic-2");
 
         records = consumer.poll(Duration.ofMillis(500));
         // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
         assertEquals("consumer record size is not zero", 0, records.count());
         consumer.close();
+    }
+
+    private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int partitions, String msgPrefix) {
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions but the last one

Review comment:
       Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r432749206



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException {
 
         assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
             new TopicPartition("primary.test-topic-1", 0)));
+        assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
+
+        // Produce additional messages.
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions this time
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
 
         // Failover consumer group to backup cluster.
-        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer1.assign(backupOffsets.keySet());
+        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
+                put("group.id", "consumer-group-1");
+                put("auto.offset.reset", "latest");
+            }};
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(consumerProps);
+        List<TopicPartition> backupPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("primary.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer1.assign(backupPartitions);
         backupOffsets.forEach(consumer1::seek);
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
 
         assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
-            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("primary.test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)));
+        assertEquals("Consumer failedover to non-zero offset on last partition.", 0,
+            consumer1.position(new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
         assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
             CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
 
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages1 = consumeAllMessages(consumer1);
+        System.out.println(messages1);
+        for (TopicPartition tp : backupPartitions) {
+            assertNotNull("No data consumed from partition " + tp + ".", messages1.get(tp));
+            int expectedMessageCount = tp.toString().equals("test-topic-1-0") ? 22 : 10;

Review comment:
       You're right. This looks magic indeed! I had to think about it again to understand my own reasoning :)
   I added comments to explain what's going on here and why we expect 22 for one partition and 10 for the others.
   I left the tests as they were. I think the comment makes the test logic clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
heritamas commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r460197007



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -132,7 +132,7 @@ public String version() {
             return listConsumerGroupOffsets(group).entrySet().stream()
                 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
                 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-                .filter(x -> x.downstreamOffset() > 0)  // ignore offsets we cannot translate accurately
+                .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we cannot translate accurately

Review comment:
       Yes, does not hurt to leave it. Just for sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r432585613



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -316,6 +354,23 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer) {

Review comment:
       Good point. Will add that.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #8730:
URL: https://github.com/apache/kafka/pull/8730


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496306398



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -136,10 +141,19 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions of test-topic-1
+        produceMessages(primary, "test-topic-1", "message-1-");
+        produceMessages(backup, "test-topic-1", "message-2-");
+
+        // Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
+        Map<String, Object> dummyProps = new HashMap<String, Object>();

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496326988



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -201,26 +213,24 @@ public void close() {
 
     @Test
     public void testReplication() throws InterruptedException {
+        String consumerGroupName = "consumer-group-testReplication";
+        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
+                put("group.id", consumerGroupName);
+                put("auto.offset.reset", "latest");

Review comment:
       Yep, unnecessary. I'll remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r432589305



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException {
 
         assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
             new TopicPartition("primary.test-topic-1", 0)));
+        assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
+
+        // Produce additional messages.
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions this time
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
 
         // Failover consumer group to backup cluster.
-        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer1.assign(backupOffsets.keySet());
+        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
+                put("group.id", "consumer-group-1");
+                put("auto.offset.reset", "latest");
+            }};
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(consumerProps);
+        List<TopicPartition> backupPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("primary.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer1.assign(backupPartitions);
         backupOffsets.forEach(consumer1::seek);
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
 
         assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
-            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("primary.test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)));
+        assertEquals("Consumer failedover to non-zero offset on last partition.", 0,
+            consumer1.position(new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
         assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
             CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
 
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages1 = consumeAllMessages(consumer1);
+        System.out.println(messages1);

Review comment:
       Oops... it escaped my final review. Removing it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
viktorsomogyi commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-664176258


   @ijuma would you please review this or suggest who is the right committer to help with this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
heritamas commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r435424111



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -132,7 +132,7 @@ public String version() {
             return listConsumerGroupOffsets(group).entrySet().stream()
                 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
                 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-                .filter(x -> x.downstreamOffset() > 0)  // ignore offsets we cannot translate accurately
+                .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we cannot translate accurately

Review comment:
       Can downstream offset be negative?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496306777



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException {
 
         waitForCondition(() -> {
             try {
-                return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                     Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
-        primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
-
-        assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position(
-            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        
-        consumer2.close();
-      
+        primaryConsumer = primary.kafka().createConsumer(consumerProps);
+        primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
+        seek(primaryConsumer, primaryOffsets);
+        consumeAllMessages(primaryConsumer, 0);
+
+        assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
+            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+        assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
+            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = consumeAllMessages(primaryConsumer, 0);
+        // If offset translation was successful we expect no messages to be consumed after failback
+        assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
+        primaryConsumer.close();
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
-            backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+        produceMessages(primary, "test-topic-2", "message-3-", 1);
+        produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+        assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+        assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+        assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+    }
+
+    @Test
+    public void testReplicationWithEmptyPartition() throws InterruptedException {

Review comment:
       Thanks, @mimaison . I've changed the test to simplify it as per your suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r432587118



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -272,28 +303,35 @@ public void testReplication() throws InterruptedException {
         waitForCondition(() -> {
             try {
                 return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
-                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
+                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 1));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
+        Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(consumerProps);
+        List<TopicPartition> primaryPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("backup.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer2.assign(primaryPartitions);
         primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
 
         assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)) + Math.ceil((float) NUM_RECORDS_PRODUCED / NUM_PARTITIONS));

Review comment:
       Yep. I need to cast to float to ensure that the result of the division is a float, with a non-zero decimal value, so that ceil will round it up to the nearest integer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
edoardocomar commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r490335096



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException {
 
         waitForCondition(() -> {
             try {
-                return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                     Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
-        primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
-
-        assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position(
-            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        
-        consumer2.close();
-      
+        primaryConsumer = primary.kafka().createConsumer(consumerProps);
+        primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
+        seek(primaryConsumer, primaryOffsets);
+        consumeAllMessages(primaryConsumer, 0);
+
+        assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
+            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+        assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
+            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = consumeAllMessages(primaryConsumer, 0);
+        // If offset translation was successful we expect no messages to be consumed after failback
+        assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
+        primaryConsumer.close();
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
-            backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+        produceMessages(primary, "test-topic-2", "message-3-", 1);
+        produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+        assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+        assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+        assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+    }
+
+    @Test
+    public void testReplicationWithEmptyPartition() throws InterruptedException {
+        String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
+        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
+                put("group.id", consumerGroupName);
+                put("auto.offset.reset", "latest");
+            }};
+
+        // create topics
+        String topic = "test-topic-empty";
+        String primaryTopicReplica = "primary." + topic;
+        String backupTopicReplica = "backup." + topic;
+        primary.kafka().createTopic(topic, NUM_PARTITIONS);
+        primary.kafka().createTopic(backupTopicReplica, 1);
+        backup.kafka().createTopic(topic, NUM_PARTITIONS);
+        backup.kafka().createTopic(primaryTopicReplica, 1);
+
+        // Consume, from the primary cluster, before starting the connectors so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic, backupTopicReplica);
+        consumeAllMessages(consumer, 0);
+
+        // produce to all test-topic-empty's partitions *but the last one*, on the primary cluster
+        produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
+
+        // Consume all messages
+        consumeAllMessages(consumer, NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1));
+        consumer.close();
+
+        // Consumer group offsets after consumption: topic's last partition doesn't yet has data, so
+        // the committed offset is 0. All other topic's partition should have offset equal to NUM_RECORDS_PER_PARTITION.
+        // backupTopicReplica still has a single empty partition, since MM2 is not yet started, and its record offset is 0.
+
+        mm2Config = new MirrorMakerConfig(mm2Props);

Review comment:
       this overwrite of mm2config should go in the setup method, IMHO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r477485916



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions but the last one

Review comment:
       Good point. I'll look into this. Besides not affecting the other tests, it should make it simpler to reason about




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-685847906


   Thanks @asdaraujo, I'll try to take another look later on this week.
   
   Can you rebase on trunk? to pick up a change (https://github.com/apache/kafka/commit/241e1447fae8b85b3bb491f371357b8f92d2da72) that is now necessary to run the CI tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496306298



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException {
 
         waitForCondition(() -> {
             try {
-                return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                     Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
-        primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
-
-        assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position(
-            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        
-        consumer2.close();
-      
+        primaryConsumer = primary.kafka().createConsumer(consumerProps);
+        primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
+        seek(primaryConsumer, primaryOffsets);
+        consumeAllMessages(primaryConsumer, 0);
+
+        assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
+            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+        assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
+            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = consumeAllMessages(primaryConsumer, 0);
+        // If offset translation was successful we expect no messages to be consumed after failback
+        assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
+        primaryConsumer.close();
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
-            backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+        produceMessages(primary, "test-topic-2", "message-3-", 1);
+        produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+        assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+        assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+        assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+    }
+
+    @Test
+    public void testReplicationWithEmptyPartition() throws InterruptedException {
+        String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
+        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
+                put("group.id", consumerGroupName);
+                put("auto.offset.reset", "latest");
+            }};
+
+        // create topics
+        String topic = "test-topic-empty";
+        String primaryTopicReplica = "primary." + topic;
+        String backupTopicReplica = "backup." + topic;
+        primary.kafka().createTopic(topic, NUM_PARTITIONS);
+        primary.kafka().createTopic(backupTopicReplica, 1);
+        backup.kafka().createTopic(topic, NUM_PARTITIONS);
+        backup.kafka().createTopic(primaryTopicReplica, 1);
+
+        // Consume, from the primary cluster, before starting the connectors so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic, backupTopicReplica);
+        consumeAllMessages(consumer, 0);
+
+        // produce to all test-topic-empty's partitions *but the last one*, on the primary cluster
+        produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
+
+        // Consume all messages
+        consumeAllMessages(consumer, NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1));
+        consumer.close();
+
+        // Consumer group offsets after consumption: topic's last partition doesn't yet has data, so
+        // the committed offset is 0. All other topic's partition should have offset equal to NUM_RECORDS_PER_PARTITION.
+        // backupTopicReplica still has a single empty partition, since MM2 is not yet started, and its record offset is 0.
+
+        mm2Config = new MirrorMakerConfig(mm2Props);

Review comment:
       Thanks, @edoardocomar . I've addressed this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r498123333



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -429,4 +489,69 @@ private void deleteAllTopics(EmbeddedKafkaCluster cluster) {
         } catch (Throwable e) {
         }
     }
+
+    private void produceMessages(EmbeddedConnectCluster cluster, String topicName, String msgPrefix) {
+        produceMessages(cluster, topicName, msgPrefix, null);

Review comment:
       I wonder if we should pass `NUM_PARTITIONS` instead of `null` for the last argument. Then `numPartitions` can be an `int` in the other `produceMessages()` method. WDYT?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -381,45 +440,46 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
 
         waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
 
-        // create a consumer at backup cluster with same consumer group Id to consume 1 topic
-        Consumer<byte[], byte[]> consumer = backup.kafka().createConsumerAndSubscribeTo(
-            Collections.singletonMap("group.id", "consumer-group-1"), "primary.test-topic-1");
+        // Map<TopicPartition, OffsetAndMetadata> offsets =

Review comment:
       Is this left over from debugging?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r490344407



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -201,26 +213,24 @@ public void close() {
 
     @Test
     public void testReplication() throws InterruptedException {
+        String consumerGroupName = "consumer-group-testReplication";
+        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
+                put("group.id", consumerGroupName);
+                put("auto.offset.reset", "latest");

Review comment:
       `latest` is the default, why are we setting it?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -136,10 +141,19 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions of test-topic-1
+        produceMessages(primary, "test-topic-1", "message-1-");
+        produceMessages(backup, "test-topic-1", "message-2-");
+
+        // Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
+        Map<String, Object> dummyProps = new HashMap<String, Object>();

Review comment:
       We can use `Collections.singletonMap()` here

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException {
 
         waitForCondition(() -> {
             try {
-                return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                     Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
-        primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
-
-        assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position(
-            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        
-        consumer2.close();
-      
+        primaryConsumer = primary.kafka().createConsumer(consumerProps);
+        primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
+        seek(primaryConsumer, primaryOffsets);
+        consumeAllMessages(primaryConsumer, 0);
+
+        assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
+            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+        assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
+            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = consumeAllMessages(primaryConsumer, 0);
+        // If offset translation was successful we expect no messages to be consumed after failback
+        assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
+        primaryConsumer.close();
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
-            backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+        produceMessages(primary, "test-topic-2", "message-3-", 1);
+        produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+        assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+        assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+        assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+    }
+
+    @Test
+    public void testReplicationWithEmptyPartition() throws InterruptedException {

Review comment:
       This test is overly complicated. I think it could:
   - Create a topic
   - Produce messages to all partitions but one
   - Consume all messages
   - Start a single MirrorMaker2 instance primary->backup
   - Use `RemoteClusterUtils.translateOffsets()` to retrieve offsets
   - Assert offset for the last partition is 0
   
   For example, something along these lines (this cuts a few corners so you'd need to improve it)
   
   ```suggestion
       @Test
       public void testReplicationWithEmptyPartition() throws Exception {
           String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
           Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
               put("group.id", consumerGroupName);
               put("auto.offset.reset", "earliest");
           }};
   
           String topic = "test-topic-empty";
           primary.kafka().createTopic(topic, NUM_PARTITIONS);
           mm2Config = new MirrorMakerConfig(mm2Props);
           // produce to all test-topic-empty's partitions *but the last one*, on the primary cluster
           produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
   
           // Consume, from the primary cluster, before starting the connectors so we don't need to wait for discovery
           Consumer<byte[], byte[]> consumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic);
           consumeAllMessages(consumer, NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1));
           consumer.close();
   
           waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
   
           Map<TopicPartition, OffsetAndMetadata> backupOffsets = RemoteClusterUtils.translateOffsets(
                   mm2Config.clientConfig("backup").adminConfig(),
                   "primary",
                   consumerGroupName,
                   Duration.ofMillis(CHECKPOINT_DURATION_MS));
   
           OffsetAndMetadata oam = backupOffsets.get(new TopicPartition("primary." + topic, NUM_PARTITIONS - 1));
           assertNotNull(oam);
           assertEquals(0, oam.offset());
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r459536318



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -132,7 +132,7 @@ public String version() {
             return listConsumerGroupOffsets(group).entrySet().stream()
                 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
                 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-                .filter(x -> x.downstreamOffset() > 0)  // ignore offsets we cannot translate accurately
+                .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we cannot translate accurately

Review comment:
       @ryannedolan @heritamas If you guys think this is safe enough I can remove that filter. But it doesn't hurt to leave it there just in case a rogue negative offset comes through for whatever reason/bug... Please let me know what you think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r431952123



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -316,6 +354,23 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer) {

Review comment:
       Can we add a deadline to this function? Something like https://github.com/apache/kafka/blob/1c4eb1a5757df611735cfac9b709e0d80d0da4b3/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java#L164

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -272,28 +303,35 @@ public void testReplication() throws InterruptedException {
         waitForCondition(() -> {
             try {
                 return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
-                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
+                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 1));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
+        Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(consumerProps);
+        List<TopicPartition> primaryPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("backup.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer2.assign(primaryPartitions);
         primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
 
         assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)) + Math.ceil((float) NUM_RECORDS_PRODUCED / NUM_PARTITIONS));

Review comment:
       I don't see why we need the cast, ceil, and coercion here? Integer math?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException {
 
         assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
             new TopicPartition("primary.test-topic-1", 0)));
+        assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
+
+        // Produce additional messages.
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions this time
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
 
         // Failover consumer group to backup cluster.
-        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer1.assign(backupOffsets.keySet());
+        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
+                put("group.id", "consumer-group-1");
+                put("auto.offset.reset", "latest");
+            }};
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(consumerProps);
+        List<TopicPartition> backupPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("primary.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer1.assign(backupPartitions);
         backupOffsets.forEach(consumer1::seek);
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
 
         assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
-            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("primary.test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)));
+        assertEquals("Consumer failedover to non-zero offset on last partition.", 0,
+            consumer1.position(new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
         assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
             CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
 
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages1 = consumeAllMessages(consumer1);
+        System.out.println(messages1);
+        for (TopicPartition tp : backupPartitions) {
+            assertNotNull("No data consumed from partition " + tp + ".", messages1.get(tp));
+            int expectedMessageCount = tp.toString().equals("test-topic-1-0") ? 22 : 10;

Review comment:
       There is a lot of magic here -- why the special case and magic numbers? Can we restructure this to test the two cases separately maybe?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException {
 
         assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
             new TopicPartition("primary.test-topic-1", 0)));
+        assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
+
+        // Produce additional messages.
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions this time
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
 
         // Failover consumer group to backup cluster.
-        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer1.assign(backupOffsets.keySet());
+        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
+                put("group.id", "consumer-group-1");
+                put("auto.offset.reset", "latest");
+            }};
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(consumerProps);
+        List<TopicPartition> backupPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("primary.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer1.assign(backupPartitions);
         backupOffsets.forEach(consumer1::seek);
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
 
         assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
-            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("primary.test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)));
+        assertEquals("Consumer failedover to non-zero offset on last partition.", 0,
+            consumer1.position(new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
         assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
             CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
 
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages1 = consumeAllMessages(consumer1);
+        System.out.println(messages1);

Review comment:
       errant println




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480267122



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions but the last one
+        produceMessages(primary, "test-topic-1", NUM_PARTITIONS - 1, "message-1-");
+        produceMessages(backup, "test-topic-1", NUM_PARTITIONS - 1, "message-2-");
+
+        consumerProps = new HashMap<String, Object>() {{

Review comment:
       I changed the way this is done, setting it at the test-level, with test-specific CGs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r477485106



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedExcept
         time.sleep(5000);
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2");
+        consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "primary.test-topic-1", "primary.test-topic-2");
 
         records = consumer.poll(Duration.ofMillis(500));
         // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
         assertEquals("consumer record size is not zero", 0, records.count());
         consumer.close();
+    }
+
+    private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int partitions, String msgPrefix) {
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions but the last one

Review comment:
       Good catch. Updating.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480266459



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions but the last one

Review comment:
       I've separated the test from the existing ones, also using a different topic. Some of the logic on those tests is complex and may be hard to follow so I thought it would be better to have the tests totally separate and simpler to interpret.
   
   I think, and hope, it's easier to understand now than it was before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
ning2008wisc commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-702836102


   Hi @mimaison I will rebase my pr https://github.com/apache/kafka/pull/9224 on top of this one over this weekend and hope you may have time soon to review it as your next one. Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] asdaraujo commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
asdaraujo commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-685935118


   Thanks, @mimaison . I've rebased it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
heritamas commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r449440511



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -132,7 +132,7 @@ public String version() {
             return listConsumerGroupOffsets(group).entrySet().stream()
                 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
                 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-                .filter(x -> x.downstreamOffset() > 0)  // ignore offsets we cannot translate accurately
+                .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we cannot translate accurately

Review comment:
       Then, this filter matches everything, doesn't it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r463097918



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedExcept
         time.sleep(5000);
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2");
+        consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "primary.test-topic-1", "primary.test-topic-2");
 
         records = consumer.poll(Duration.ofMillis(500));
         // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
         assertEquals("consumer record size is not zero", 0, records.count());
         consumer.close();
+    }
+
+    private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int partitions, String msgPrefix) {
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions but the last one

Review comment:
       This comment needs updating

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -190,24 +211,19 @@ public void close() {
     public void testReplication() throws InterruptedException {
 
         // create consumers before starting the connectors so we don't need to wait for discovery
-        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1");
+        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "backup.test-topic-1");
         consumer1.poll(Duration.ofMillis(500));
         consumer1.commitSync();
         consumer1.close();
 
-        Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1");
+        Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "primary.test-topic-1");

Review comment:
       Do we still need these 2 blocks? In `setup()` we already consumed all messages

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions but the last one

Review comment:
       Would it be better using a separate topic in order to keep a partition without any records? By changing this topic it affects existing checks in all tests

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions but the last one
+        produceMessages(primary, "test-topic-1", NUM_PARTITIONS - 1, "message-1-");
+        produceMessages(backup, "test-topic-1", NUM_PARTITIONS - 1, "message-2-");
+
+        consumerProps = new HashMap<String, Object>() {{

Review comment:
       As this does not change, I wonder if we could direct initialize `consumerProps` when it's declared

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException {
 
         assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
             new TopicPartition("primary.test-topic-1", 0)));
+        assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
+
+        // Produce additional messages.
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions this time
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
 
         // Failover consumer group to backup cluster.
-        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer1.assign(backupOffsets.keySet());
+        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
+                put("group.id", "consumer-group-1");
+                put("auto.offset.reset", "latest");
+            }};
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(consumerProps);
+        List<TopicPartition> backupPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("primary.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer1.assign(backupPartitions);
         backupOffsets.forEach(consumer1::seek);
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
 
         assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
-            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("primary.test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)));
+        assertEquals("Consumer failedover to non-zero offset on last partition.", 0,
+            consumer1.position(new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
         assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
             CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
 
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages1 = consumeAllMessages(consumer1);
+        System.out.println(messages1);
+        for (TopicPartition tp : backupPartitions) {
+            assertNotNull("No data consumed from partition " + tp + ".", messages1.get(tp));
+            int expectedMessageCount = tp.toString().equals("test-topic-1-0") ? 22 : 10;

Review comment:
       I'm actually surprized we only see positions `22` and `10`. Why do we only get `test-topic-1-0` here and not the other 9 partitions?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org