You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "blacktooth (via GitHub)" <gi...@apache.org> on 2023/06/22 16:36:06 UTC

[GitHub] [kafka] blacktooth opened a new pull request, #13905: Fix MM2 not consuming from latest when "auto.offset.reset=latest

blacktooth opened a new pull request, #13905:
URL: https://github.com/apache/kafka/pull/13905

   …=latest" is set.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1893019795

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13905:
URL: https://github.com/apache/kafka/pull/13905#discussion_r1247963748


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -53,6 +53,7 @@ public class MirrorSourceTask extends SourceTask {
     private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);
 
     private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
+    public static final long NON_EXISTING_OFFSET_VALUE = -1L;

Review Comment:
   Perhaps instead of a single sentinel value to denote uncommitted offsets, we can reject all values less than zero and replace this constant with a method? Could also help with readability with some of the Java 8 streams logic.
   
   ```java
   private boolean isUncommitted(Long offset) {
       return offset == null || offset < 0;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1767027696

   @electrical @showuon @blacktooth I've published https://github.com/apache/kafka/pull/14567, which is based off this PR and contains the suggestions I proposed during code review (including the new integration test) and is rebased on the latest trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13905: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1604331568

   @divijvaidya is there a reason you've assigned this to me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "electrical (via GitHub)" <gi...@apache.org>.
electrical commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753259175

   @C0urante the other PR is **https://github.com/apache/kafka/pull/12358**
   So far it seems it was reviewed and approved but no movement from that point on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13905:
URL: https://github.com/apache/kafka/pull/13905#discussion_r1247968250


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -31,25 +31,36 @@
 import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
 import org.apache.kafka.connect.source.SourceRecord;
 
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+import org.mockito.internal.util.collections.Sets;

Review Comment:
   Nit: probably better to avoid depending on internal packages if we can avoid it. Left some suggestions on how to do this without much additional work below



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -106,13 +107,28 @@ public void start(Map<String, String> props) {
         Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
         consumer.assign(topicPartitionOffsets.keySet());
         log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream()
-            .filter(x -> x.getValue() == 0L).count());
-        log.trace("Seeking offsets: {}", topicPartitionOffsets);
-        topicPartitionOffsets.forEach(consumer::seek);
+            .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());

Review Comment:
   Nit: we can clean this up if we use the `isUncommitted` method
   
   ```suggestion
           log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream()
                   .filter(this::isUncommitted).count());
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -214,6 +225,82 @@ public void testPoll() {
         }
     }
 
+    @Test
+    public void testSeekBehaviorDuringStart() {
+        // Setting up mock behavior.
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> mockProducer = mock(KafkaProducer.class);
+
+        String sourceClusterName = "sourceCluster";
+        MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class);
+
+        SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class);
+        OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class);
+        when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader);
+
+        MockedStatic<MirrorUtils> mockMirrorUtils = mockStatic(MirrorUtils.class, new CallsRealMethods());
+        mockMirrorUtils.when(() -> MirrorUtils.newConsumer(anyMap())).thenReturn(mockConsumer);
+        mockMirrorUtils.when(() -> MirrorUtils.newProducer(anyMap())).thenReturn(mockProducer);
+
+        Set<TopicPartition> topicPartitions = Sets.newSet(

Review Comment:
   ```suggestion
           Set<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList(
   ```
   
   (will also require a couple new imports and an additional trailing parenthesis)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -106,13 +107,28 @@ public void start(Map<String, String> props) {
         Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
         consumer.assign(topicPartitionOffsets.keySet());
         log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream()
-            .filter(x -> x.getValue() == 0L).count());
-        log.trace("Seeking offsets: {}", topicPartitionOffsets);
-        topicPartitionOffsets.forEach(consumer::seek);
+            .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());
+
+        log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream()
+                .filter(topicPartitionOffset ->
+                        topicPartitionOffset.getValue() != NON_EXISTING_OFFSET_VALUE));
+
+        topicPartitionOffsets.forEach(this::maybeSeek);
         log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(),
             taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions);
     }
 
+    private void maybeSeek(TopicPartition topicPartition, Long offset) {
+        // Do not call seek on partitions that don't have an existing offset committed.
+        if (offset == NON_EXISTING_OFFSET_VALUE) {

Review Comment:
   If we use `isUncommitted`:
   ```suggestion
           if (isUncommitted(offset)) {
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -106,13 +107,28 @@ public void start(Map<String, String> props) {
         Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
         consumer.assign(topicPartitionOffsets.keySet());
         log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream()
-            .filter(x -> x.getValue() == 0L).count());
-        log.trace("Seeking offsets: {}", topicPartitionOffsets);
-        topicPartitionOffsets.forEach(consumer::seek);
+            .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());
+
+        log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream()
+                .filter(topicPartitionOffset ->
+                        topicPartitionOffset.getValue() != NON_EXISTING_OFFSET_VALUE));
+
+        topicPartitionOffsets.forEach(this::maybeSeek);

Review Comment:
   Perhaps to simplify testing and obviate the need for static mocking, we can pull out some of this logic into a separate method?
   
   I'm imagining something like this:
   
   ```java
   // visible for testing
   void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
       Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
       consumer.assign(topicPartitionOffsets.keySet());
       log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream()
               .filter(this::isUncommitted).count());
   
       log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream()
               .filter(tpo -> !isUncommitted(tpo.getValue())));
   
       topicPartitionOffsets.forEach(this::maybeSeek);
   }
   ```
   
   And then in `start` we would have:
   ```java
   Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
   initializeConsumer(taskTopicPartitions);
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -214,6 +225,82 @@ public void testPoll() {
         }
     }
 
+    @Test
+    public void testSeekBehaviorDuringStart() {
+        // Setting up mock behavior.
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> mockProducer = mock(KafkaProducer.class);
+
+        String sourceClusterName = "sourceCluster";
+        MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class);
+
+        SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class);
+        OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class);
+        when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader);
+
+        MockedStatic<MirrorUtils> mockMirrorUtils = mockStatic(MirrorUtils.class, new CallsRealMethods());

Review Comment:
   I don't think we'll need this if we adopt the isolated `initializeConsumer` method, but just in case, we can tweak this to use official API from Mockito instead of an internal class:
   ```suggestion
           MockedStatic<MirrorUtils> mockMirrorUtils = mockStatic(MirrorUtils.class, CALLS_REAL_METHODS);
   ```
   
   (requires a static import for `org.mockito.Mockito.CALLS_REAL_METHODS`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1709349339

   @blacktooth , we also face the similar issue and saw this. This PR has been pending for a while. Do you have time to address the review comments? Need any help? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "xinyuliu-cb (via GitHub)" <gi...@apache.org>.
xinyuliu-cb commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1743917984

   same here. catching up from the earliest offsets is too expensive and resource demanding to MM2 and Kafka brokers. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "electrical (via GitHub)" <gi...@apache.org>.
electrical commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753118374

   Hi,
   
   Seems there are now 2 PR's open to solve the same issue but both are not moving forward at all and is blocking a lot of people of moving forward to migrate to MM2. 
   I really hope someone will push this over the finish line.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante closed pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set
URL: https://github.com/apache/kafka/pull/13905


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13905: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1604358439

   @C0urante oh, I should have mentioned the reason. You are the committer I consider an expert on MM2 and connect framework and that is why I assigned it you. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753124395

   @electrical can you link to the second PR? I've reviewed this one and am waiting on the author. If a second author is more responsive, we may be able to fix this issue sooner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1898635563

   Superseded by https://github.com/apache/kafka/pull/14567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org