You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/01 00:30:43 UTC

[GitHub] [kafka] C0urante opened a new pull request, #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

C0urante opened a new pull request, #13181:
URL: https://github.com/apache/kafka/pull/13181

   The [Jira](https://issues.apache.org/jira/browse/KAFKA-14610) contains a more detailed description of the motivation for this change. To summarize, if there are bursty topic partitions, offset sync starvation can occur since MM2 limits the number of in-flight syncs at one time.
   
   The fix here is to enqueue the latest applicable offset for any to-be-synced topic partition in the `commitRecord` method, immediately fire off all syncs that can be published in that method, and then periodically re-check for new syncs in `commit` in case there are syncs that could not be published due to, e.g., a burst of throughput on a topic partition.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13181:
URL: https://github.com/apache/kafka/pull/13181#discussion_r1093398759


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();

Review Comment:
   SpotBugs wasn't giving me any grief. But it is inconsistent.
   
   I wanted there to be a single place where `pendingOffsetSyncs` was declared in order to guarantee that it was a `LinkedHashMap` with the ordering logic that we needed.
   
   After thinking it over a bit, I think it should be fine to change the left-hand type of the field to `LinkedHashMap` and use the no-args constructor to instantiate it in two places instead of one.
   
   With that, it's no different than the `consumerAccess` semaphore.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -194,41 +200,63 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) {
         TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
         long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
         long downstreamOffset = metadata.offset();
-        maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.debug("No more pending offset syncs");

Review Comment:
   Ack, good call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13181:
URL: https://github.com/apache/kafka/pull/13181#issuecomment-1419098772

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13181:
URL: https://github.com/apache/kafka/pull/13181#discussion_r1093398759


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();

Review Comment:
   SpotBugs wasn't giving me any grief. But it is inconsistent.
   
   I wanted there to be a single place where `pendingOffsetSyncs` was declared in order to guarantee that it was a `LinkedHashMap` with the ordering logic that we needed.
   
   After thinking it over a bit, I think it should be fine to change the left-hand type of the field to `LinkedHashMap` and use the no-args constructor to instantiate it in two places instead of one.
   
   With that, it's no different than the `consumerAccess` semaphore.
   
   EDIT: Actually, with that approach, SpotBugs does start failing the build. So I think we can leave this as-is for now instead of adding synchronized blocks in `start` and/or a constructor.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();

Review Comment:
   SpotBugs wasn't giving me any grief. But it is inconsistent.
   
   I wanted there to be a single place where `pendingOffsetSyncs` was declared in order to guarantee that it was a `LinkedHashMap` with the ordering logic that we needed.
   
   After thinking it over a bit, I think it should be fine to change the left-hand type of the field to `LinkedHashMap` and use the no-args constructor to instantiate it in two places instead of one.
   
   With that, it's no different than the `consumerAccess` semaphore.
   
   EDIT: Actually, with that approach, SpotBugs does start failing the build. So I think we can leave this as-is for now instead of adding synchronized blocks in `start` and/or constructors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13181:
URL: https://github.com/apache/kafka/pull/13181#discussion_r1092643889


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();

Review Comment:
   nit: findbugs doesn't complain about this not being synchronized? you must be on better terms with it than I am.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -194,41 +200,63 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) {
         TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
         long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
         long downstreamOffset = metadata.offset();
-        maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.debug("No more pending offset syncs");

Review Comment:
   nit: these log statements could be spammy if they're called on every commitRecord, wdyt about `trace`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison merged pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison merged PR #13181:
URL: https://github.com/apache/kafka/pull/13181


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org