You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/01 19:43:57 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r740478689



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       As you point out, the old log message was:
   ```
    log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
   ```
   This log message had two things it'd be nice to keep:
   1. `this` as the context; and
   2. the number of records whose offsets were being committed (e.g., the number of acked records).
   
   I think both would be good to include, especially if we're saying the number of records whose offsets are _not_ being committed (yet).
   
   The `Pending` class seems pretty useful, but computing the number of acked records is not possible here. WDYT about merging the `SumittedRecords.committableOffsets()` and `pending()` methods, by having the former return an object that contains the offset map _and_ the metadata that can be used for logging? This class would be like `Pending`, though maybe `CommittableOffsets` is a more apt name. Plus, `WorkerSourceTask` would only have one volatile field that is updated atomically.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org