You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/02/16 17:18:47 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

gharris1727 opened a new pull request, #13262:
URL: https://github.com/apache/kafka/pull/13262

   Source tasks in non-EOS mode periodically call SourceTask::commit even if no records are returned from poll.
   This change adds that behavior to EOS mode by considering empty batches to be dispatched immediately, thus giving the transaction boundary manager an opportunity to initiate a commit.
   
   This does not address tasks which spend a long time in poll(). In non-EOS mode, these tasks would get concurrent commit() calls, while in EOS mode the commit() calls will only occur while a poll() is not in-progress. Tasks which want to receive periodic commit() calls should periodically release control by returning null from poll.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13262:
URL: https://github.com/apache/kafka/pull/13262#issuecomment-1433640900

   > This is covered by the existing `testPollReturnsNoRecords` where the two arguments to verifyTransactions are different.
   
   Ah yes, thanks. Sorry for missing that.
   
   > Also the tests uncovered a difference in behavior for `testSlowTaskStart`: If the task is started and then stopped before the start() method completes, the final task shutdown now calls commit().
   
   Good catch. I think this is reasonable considering it matches the behavior in non-EOS mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13262:
URL: https://github.com/apache/kafka/pull/13262#issuecomment-1434971635

   Addressed the Mockito migration issues and backported to 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13262:
URL: https://github.com/apache/kafka/pull/13262#issuecomment-1433589831

   > Could we also add a case to ExactlyOnceWorkerSourceTaskTest that ensures that SourceTask::commit is invoked even if the task doesn't produce any records?
   
   This is covered by the existing `testPollReturnsNoRecords` where the two arguments to verifyTransactions are different.
   
   Also the tests uncovered a difference in behavior for `testSlowTaskStart`: If the task is started and then stopped before the start() method completes, the final task shutdown now calls commit().


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13262:
URL: https://github.com/apache/kafka/pull/13262#discussion_r1108806764


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -258,6 +258,21 @@ private void commitTransaction() {
 
         long started = time.milliseconds();
 
+        if (!transactionOpen && !offsetWriter.willFlush()) {

Review Comment:
   I'm wondering if the willFlush method is still necessary. Can we remove it completely and replace it with beginFlush? We don't have to worry about making sure we have an open transaction before invoking that method since it doesn't actually produce anything to Kafka.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -258,6 +258,21 @@ private void commitTransaction() {
 
         long started = time.milliseconds();
 
+        if (!transactionOpen && !offsetWriter.willFlush()) {
+            // There is no contents on the framework side to commit, so skip the offset flush and producer commit
+            long durationMillis = time.milliseconds() - started;
+            recordCommitSuccess(durationMillis);
+            log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
+
+            // Synchronize in order to guarantee that writes on other threads are picked up by this one
+            synchronized (commitableRecords) {
+                commitableRecords.forEach(this::commitTaskRecord);
+                commitableRecords.clear();
+            }

Review Comment:
   
   This part shouldn't be necessary, should it? There should be no committable records if there's no open transaction and we don't have any offsets for filtered records to commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13262:
URL: https://github.com/apache/kafka/pull/13262#issuecomment-1433896571

   CI looks good, with some acceptable exceptions:
   
   - Failures due to the recent changes to the Connect `PluginInfoTest`, which are addressed separately in https://github.com/apache/kafka/pull/13266 and are not related to this PR
   - Unrelated failures in core modules
   - Some Jenkins nodes are stuck (have been queued for several hours without building)
   
   Given this, I feel comfortable merging. Thanks again, Greg!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13262:
URL: https://github.com/apache/kafka/pull/13262


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13262: KAFKA-14727: Enable periodic offset commits for EOS source tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13262:
URL: https://github.com/apache/kafka/pull/13262#issuecomment-1433903556

   Backported to 3.4. There were merge conflicts in the `ExactlyOnceWorkerSourceTaskTest` suite (caused by the Mockito migration) on 3.3; I may revisit at some point if I can find the time, or if anyone else wants to file a PR targeting 3.3 with a cherry-pick of aea6090.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org