You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/02/07 00:05:02 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

gharris1727 opened a new pull request, #13208:
URL: https://github.com/apache/kafka/pull/13208

   Both the SourceTaskOffsetCommitter and WorkerSourceTask trigger offset commits. Currently, when both threads attempt to start concurrent flushes, the second to call beginFlush receives an exception. The SourceTaskOffsetCommitter swallows this exception, while the WorkerSourceTask propagates the exception, preventing the final offset from completing cleanly and dropping final offsets.
   
   This change allows the second caller of waitForBeginFlush to wait for the first flush operation to complete, avoiding exceptions if offset flushes are prompt.
   
   Signed-off-by: Greg Harris <gr...@aiven.io>
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106203170


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -256,14 +257,25 @@ private void maybeBeginTransaction() {
     private void commitTransaction() {
         log.debug("{} Committing offsets", this);
 
+        long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
         long started = time.milliseconds();
+        long deadline = started + commitTimeoutMs;
 
         // We might have just aborted a transaction, in which case we'll have to begin a new one
         // in order to commit offsets
         maybeBeginTransaction();
 
         AtomicReference<Throwable> flushError = new AtomicReference<>();
-        if (offsetWriter.beginFlush()) {
+        boolean shouldFlush = false;
+        try {
+            // Provide a constant timeout value to wait indefinitely, as there should not be any concurrent flushes.
+            // This is because commitTransaction is always called on the same thread, and should always block until
+            // the flush is complete, or cancel the flush if an error occurs.
+            shouldFlush = offsetWriter.beginFlush(deadline - time.milliseconds(), TimeUnit.MILLISECONDS);

Review Comment:
   I agree with the comment about how this method should never be invoked while there are in-progress flushes.
   
   Given that, is there any reason to go to the work of calculating a deadline and deriving a timeout from it, instead of simply invoking this method with a timeout of zero?
   
   We could even add a no-arg variant of `beginFlush` that calls `beginFlush(0, TimeUnit.MILLISECONDS)`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -104,44 +101,29 @@ private boolean flushing() {
         return toFlush != null;
     }
 
-    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
-        while (true) {
-            Future<Void> inProgressFlush;
-            synchronized (this) {
-                if (flushing()) {
-                    inProgressFlush = latestFlush;
-                } else {
-                    return beginFlush();
-                }
-            }
-            try {
-                inProgressFlush.get(timeout.get(), timeUnit);
-            } catch (ExecutionException e) {
-                // someone else is responsible for handling this error, we just want to wait for the flush to be over.
-            }
-        }
-    }
-
     /**
      * Performs the first step of a flush operation, snapshotting the current state. This does not
-     * actually initiate the flush with the underlying storage.
+     * actually initiate the flush with the underlying storage. Ensures that any previous flush operations
+     * have finished before beginning a new flush.
      *
+     * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting
+     * @param timeUnit Units of the timeout argument
      * @return true if a flush was initiated, false if no data was available
+     * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete
+     * @throws TimeoutException if the `timeout` elapses before previous flushes are complete.

Review Comment:
   Nit: Javadocs != markdown, should be `{@code timeout}` (without backticks).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13208:
URL: https://github.com/apache/kafka/pull/13208#issuecomment-1432384080

   Test failures are unrelated; merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13208:
URL: https://github.com/apache/kafka/pull/13208#issuecomment-1420604272

   Thanks @gharris1727 for the PR! The changes look good to me. Since it's an old and tricky issue, I'd like @C0urante to also take a look, if possible, before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13208:
URL: https://github.com/apache/kafka/pull/13208#issuecomment-1420026975

   I verified that this fix corrects the flakey failures I was seeing in ConnectDistributedTest.test_bounce by manually running the tests with a flush interval of 1ms.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107669147


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -100,23 +104,45 @@ private boolean flushing() {
 
     /**
      * Performs the first step of a flush operation, snapshotting the current state. This does not
-     * actually initiate the flush with the underlying storage.
+     * actually initiate the flush with the underlying storage. Ensures that any previous flush operations
+     * have finished before beginning a new flush.
      *
      * @return true if a flush was initiated, false if no data was available
+     * @throws ConnectException if the previous flush is not complete before this method is called
      */
-    public synchronized boolean beginFlush() {
-        if (flushing()) {
-            log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the "
+    public boolean beginFlush() {
+        try {
+            return beginFlush(0, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException | TimeoutException e) {
+            log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the "
                     + "framework should not allow this");
             throw new ConnectException("OffsetStorageWriter is already flushing");
         }
+    }
 
-        if (data.isEmpty())
-            return false;
-
-        toFlush = data;
-        data = new HashMap<>();
-        return true;
+    /**
+     * Performs the first step of a flush operation, snapshotting the current state. This does not
+     * actually initiate the flush with the underlying storage. Ensures that any previous flush operations
+     * have finished before beginning a new flush.
+     *
+     * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting
+     * @param timeUnit Units of the timeout argument
+     * @return true if a flush was initiated, false if no data was available
+     * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete
+     * @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete.
+     */
+    public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+        if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
+            synchronized (this) {
+                if (data.isEmpty())
+                    return false;

Review Comment:
   Oh wow that's pretty serious, I added a unit test that targets this release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107664475


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception {
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null);
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
         // Second time it succeeds
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
 
         // Third time it has no data to flush so we won't get past beginFlush()
-        assertFalse(writer.beginFlush());
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testAlreadyFlushing() {
+    public void testAlreadyFlushing() throws InterruptedException, TimeoutException {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = mock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be mid-flush
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Oh i understand now, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107391193


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -365,6 +365,10 @@ public void execute() {
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
         } catch (RuntimeException e) {
+            if (isCancelled()) {
+                log.debug("Skipping final offset commit as task has been cancelled");
+                throw e;
+            }

Review Comment:
   Honestly not sure why we didn't put this here to begin with. Nice 👍



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception {
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null);
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
         // Second time it succeeds
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
 
         // Third time it has no data to flush so we won't get past beginFlush()
-        assertFalse(writer.beginFlush());
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testAlreadyFlushing() {
+    public void testAlreadyFlushing() throws InterruptedException, TimeoutException {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = mock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be mid-flush
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Still think this may apply here



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -100,23 +104,45 @@ private boolean flushing() {
 
     /**
      * Performs the first step of a flush operation, snapshotting the current state. This does not
-     * actually initiate the flush with the underlying storage.
+     * actually initiate the flush with the underlying storage. Ensures that any previous flush operations
+     * have finished before beginning a new flush.
      *
      * @return true if a flush was initiated, false if no data was available
+     * @throws ConnectException if the previous flush is not complete before this method is called
      */
-    public synchronized boolean beginFlush() {
-        if (flushing()) {
-            log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the "
+    public boolean beginFlush() {
+        try {
+            return beginFlush(0, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException | TimeoutException e) {
+            log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the "
                     + "framework should not allow this");
             throw new ConnectException("OffsetStorageWriter is already flushing");
         }
+    }
 
-        if (data.isEmpty())
-            return false;
-
-        toFlush = data;
-        data = new HashMap<>();
-        return true;
+    /**
+     * Performs the first step of a flush operation, snapshotting the current state. This does not
+     * actually initiate the flush with the underlying storage. Ensures that any previous flush operations
+     * have finished before beginning a new flush.
+     *
+     * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting
+     * @param timeUnit Units of the timeout argument
+     * @return true if a flush was initiated, false if no data was available
+     * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete
+     * @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete.
+     */
+    public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+        if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
+            synchronized (this) {
+                if (data.isEmpty())
+                    return false;

Review Comment:
   Should we release the semaphore here?
   
   And either way, we should add some detail to the Javadocs for this method and `cancelFlush` / `doFlush` so that people understand the conditions that cause a flush to transition from in-progress to completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception {
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null);
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
         // Second time it succeeds
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
 
         // Third time it has no data to flush so we won't get past beginFlush()
-        assertFalse(writer.beginFlush());
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testAlreadyFlushing() {
+    public void testAlreadyFlushing() throws InterruptedException, TimeoutException {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = mock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be mid-flush
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Worth it to add a check to ensure that `beginFlush` times out if if has been invoked previously, before `doFlush` has also been called?
   ```suggestion
           assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
           assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception {
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null);
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
         // Second time it succeeds
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
 
         // Third time it has no data to flush so we won't get past beginFlush()
-        assertFalse(writer.beginFlush());
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testAlreadyFlushing() {
+    public void testAlreadyFlushing() throws InterruptedException, TimeoutException {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = mock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be mid-flush
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Worth it to add a check to ensure that `beginFlush` times out if ti has been invoked previously, before `doFlush` has also been called?
   ```suggestion
           assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
           assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1100459265


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   It seems like we're bending over backwards here to accommodate an assumption made in `beginFlush` that we'll never try to trigger two offset flushes at once, which is clearly false given the conditions that necessitate this fix (i.e., a task's end-of-life offset flush is triggered at the same time as its periodic offset flush).
   
   Given that, do we really need a separate method here, or can we relax the constraints in `beginFlush` to wait for in-progress flushes to conclude instead of throwing an exception if there are any?
   
   Additionally, it seems like the use of a `CompleteableFuture` here is a bit strange. Would a `Semaphore` or `CountDownLatch` be more suited?
   
   Finally--since this change may lead to us performing double offset commits when a task is being shut down, do you think it might also make sense to add a `close` method to the offset writer that throws an exception for any further attempts to flush, and possibly forcibly terminates any in-progress flushes? We can invoke that in `AbstractWorkerTask::cancel` (or possibly `WorkerSourceTask::cancel` if a different approach is necessary to preserve exactly-once semantics) to help tasks complete shutdown within the timeout allotted to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1103447933


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   > Given that, do we really need a separate method here, or can we relax the constraints in beginFlush to wait for in-progress flushes to conclude instead of throwing an exception if there are any?
   
   The reason I implemented it as two separate methods was to minimize the disturbance to other call-sites, specifically the ExactlyOnceSourceTask. Because this introduces a new way to fail due to a timeout, and the ExactlyOnceSourceTask doesn't respect the flush timeout, I thought that it might not be a desirable change where the assertion does no harm.
   
   Upon reflection, I think diverging the usage patterns of the OffsetStorageWriter in these two different contexts is more error-prone than migrating both of them to the new semantics, so I'll update the PR to incorporate your feedback.
   
   > Would a Semaphore or CountDownLatch be more suited?
   
   Thanks, that makes a lot more sense. I was originally trying to re-use the doFlush future, but since `flushing()` starts in `beginFlush`, i needed a synchronizer that i created in beginFlush.
   
   > Finally--since this change may lead to us performing double offset commits when a task is being shut down, do you think it might also make sense to add a close method to the offset writer that throws an exception for any further attempts to flush, and possibly forcibly terminates any in-progress flushes? We can invoke that in AbstractWorkerTask::cancel (or possibly WorkerSourceTask::cancel if a different approach is necessary to preserve exactly-once semantics) to help tasks complete shutdown within the timeout allotted to them.
   
   In EOS mode, if the task is cancelled before it comes time to perform the final offset commit, the final offset commit is skipped. This does not change in this PR, because the ExactlyOnceSourceTask should never leave a flush open and have to wait for a previous flush to finish.
   
   In non-EOS mode, the check of the cancelled flag isn't present. It appears that there are already some wait conditions (in WorkerSourceTask::finalOffsetCommit) to maximize the number of records included in the flush, and that wait condition may cause the thread to commit offsets after cancellation. I don't think that this PR makes double commits possible where they weren't before.
   
   WDYT about adding the EOS-style cancellation semantics to the final commit, or closing the OffsetBackingStore in cancel() to address these cases? Do you think that we can explore those changes in a follow-up PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13208:
URL: https://github.com/apache/kafka/pull/13208


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106250177


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   > I don't think that this PR makes double commits possible where they weren't before.
   
   So the issue with double commits in non-EOS mode is that, right now, we may throw an exception because of the bug that we're addressing here. But if we fix that exception, then double commits become possible. And if the first commit takes a while, then we might end up lagging too much and performing our second commit after a new instance of the same source task has been brought up.
   
   > WDYT about adding the EOS-style cancellation semantics to the final commit, or closing the OffsetBackingStore in cancel() to address these cases? Do you think that we can explore those changes in a follow-up PR?
   
   I think adding the EOS-style cancellation semantics would be okay for now, though they aren't as effective for this kind of task since we don't have a way of fencing out producers. We can do that part in this PR, and then file a Jira ticket to improve cancellation logic for source task offset commit, which we can explore at a later point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org