You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/01 15:42:25 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

C0urante commented on code in PR #13181:
URL: https://github.com/apache/kafka/pull/13181#discussion_r1093398759


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();

Review Comment:
   SpotBugs wasn't giving me any grief. But it is inconsistent.
   
   I wanted there to be a single place where `pendingOffsetSyncs` was declared in order to guarantee that it was a `LinkedHashMap` with the ordering logic that we needed.
   
   After thinking it over a bit, I think it should be fine to change the left-hand type of the field to `LinkedHashMap` and use the no-args constructor to instantiate it in two places instead of one.
   
   With that, it's no different than the `consumerAccess` semaphore.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -194,41 +200,63 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) {
         TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
         long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
         long downstreamOffset = metadata.offset();
-        maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.debug("No more pending offset syncs");

Review Comment:
   Ack, good call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org