You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/16 17:30:24 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

C0urante commented on code in PR #13262:
URL: https://github.com/apache/kafka/pull/13262#discussion_r1108806764


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -258,6 +258,21 @@ private void commitTransaction() {
 
         long started = time.milliseconds();
 
+        if (!transactionOpen && !offsetWriter.willFlush()) {

Review Comment:
   I'm wondering if the willFlush method is still necessary. Can we remove it completely and replace it with beginFlush? We don't have to worry about making sure we have an open transaction before invoking that method since it doesn't actually produce anything to Kafka.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -258,6 +258,21 @@ private void commitTransaction() {
 
         long started = time.milliseconds();
 
+        if (!transactionOpen && !offsetWriter.willFlush()) {
+            // There is no contents on the framework side to commit, so skip the offset flush and producer commit
+            long durationMillis = time.milliseconds() - started;
+            recordCommitSuccess(durationMillis);
+            log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
+
+            // Synchronize in order to guarantee that writes on other threads are picked up by this one
+            synchronized (commitableRecords) {
+                commitableRecords.forEach(this::commitTaskRecord);
+                commitableRecords.clear();
+            }

Review Comment:
   
   This part shouldn't be necessary, should it? There should be no committable records if there's no open transaction and we don't have any offsets for filtered records to commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org