You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/08 17:22:44 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1100459265


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   It seems like we're bending over backwards here to accommodate an assumption made in `beginFlush` that we'll never try to trigger two offset flushes at once, which is clearly false given the conditions that necessitate this fix (i.e., a task's end-of-life offset flush is triggered at the same time as its periodic offset flush).
   
   Given that, do we really need a separate method here, or can we relax the constraints in `beginFlush` to wait for in-progress flushes to conclude instead of throwing an exception if there are any?
   
   Additionally, it seems like the use of a `CompleteableFuture` here is a bit strange. Would a `Semaphore` or `CountDownLatch` be more suited?
   
   Finally--since this change may lead to us performing double offset commits when a task is being shut down, do you think it might also make sense to add a `close` method to the offset writer that throws an exception for any further attempts to flush, and possibly forcibly terminates any in-progress flushes? We can invoke that in `AbstractWorkerTask::cancel` (or possibly `WorkerSourceTask::cancel` if a different approach is necessary to preserve exactly-once semantics) to help tasks complete shutdown within the timeout allotted to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org