You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/02/11 00:46:40 UTC

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1103447933


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   > Given that, do we really need a separate method here, or can we relax the constraints in beginFlush to wait for in-progress flushes to conclude instead of throwing an exception if there are any?
   
   The reason I implemented it as two separate methods was to minimize the disturbance to other call-sites, specifically the ExactlyOnceSourceTask. Because this introduces a new way to fail due to a timeout, and the ExactlyOnceSourceTask doesn't respect the flush timeout, I thought that it might not be a desirable change where the assertion does no harm.
   
   Upon reflection, I think diverging the usage patterns of the OffsetStorageWriter in these two different contexts is more error-prone than migrating both of them to the new semantics, so I'll update the PR to incorporate your feedback.
   
   > Would a Semaphore or CountDownLatch be more suited?
   
   Thanks, that makes a lot more sense. I was originally trying to re-use the doFlush future, but since `flushing()` starts in `beginFlush`, i needed a synchronizer that i created in beginFlush.
   
   > Finally--since this change may lead to us performing double offset commits when a task is being shut down, do you think it might also make sense to add a close method to the offset writer that throws an exception for any further attempts to flush, and possibly forcibly terminates any in-progress flushes? We can invoke that in AbstractWorkerTask::cancel (or possibly WorkerSourceTask::cancel if a different approach is necessary to preserve exactly-once semantics) to help tasks complete shutdown within the timeout allotted to them.
   
   In EOS mode, if the task is cancelled before it comes time to perform the final offset commit, the final offset commit is skipped. This does not change in this PR, because the ExactlyOnceSourceTask should never leave a flush open and have to wait for a previous flush to finish.
   
   In non-EOS mode, the check of the cancelled flag isn't present. It appears that there are already some wait conditions (in WorkerSourceTask::finalOffsetCommit) to maximize the number of records included in the flush, and that wait condition may cause the thread to commit offsets after cancellation. I don't think that this PR makes double commits possible where they weren't before.
   
   WDYT about adding the EOS-style cancellation semantics to the final commit, or closing the OffsetBackingStore in cancel() to address these cases? Do you think that we can explore those changes in a follow-up PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org