You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/02/01 01:11:36 UTC

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

gharris1727 commented on code in PR #13181:
URL: https://github.com/apache/kafka/pull/13181#discussion_r1092643889


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();

Review Comment:
   nit: findbugs doesn't complain about this not being synchronized? you must be on better terms with it than I am.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -194,41 +200,63 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) {
         TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
         long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
         long downstreamOffset = metadata.offset();
-        maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.debug("No more pending offset syncs");

Review Comment:
   nit: these log statements could be spammy if they're called on every commitRecord, wdyt about `trace`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org