You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/11 20:53:06 UTC

[GitHub] [kafka] C0urante opened a new pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

C0urante opened a new pull request #10112:
URL: https://github.com/apache/kafka/pull/10112


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12226)
   
   When a task fails to commit offsets because all outstanding records haven't been ack'd by the broker yet, it's better to retry that same batch. Otherwise, the set of outstanding records can grow indefinitely and all subsequent offset commit attempts can fail. By retrying the same batch, it becomes possible to eventually commit offsets, even when the producer is unable to keep up with the throughput of the records provided to it by the task.
   
   Two unit tests are added to verify this behavior.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-914348989


   @rhauch Overall that looks good to me. It's an elegant solution to the tricky problem you noted about the opacity of task-provided source offsets w/r/t ordering.
   
   I'm a little worried about offset commits taking longer and longer with the more sophisticated approach you proposed (where we would unconditionally iterate over every record in the batch, instead of only until the first unacknowledged record). It's true that there would be natural back pressure from the producer as its `buffer.memory` gets eaten up, but with the default of 32MB, it still seems possible for a large number of unacknowledged records to build up. If this does happen, then offset commits may end up exceeding the `offset.flush.timeout.ms` for the worker, which may cause issues with the current model where a single shared, worker-global thread is used for offset commits of all tasks.
   
   If this is a valid concern and we'd like to take it into account for now, I can think of a couple ways to handle it off the top of my head:
   1. Use the simpler approach that blocks offset commits across the board if a single record remains unacknowledged for a long period of time (which may realistically be a problem if a single partition out of many is unavailable for some reason).
   2. Enable concurrent offset commits by multiple tasks.
   3. Instead of a single dequeue per task, use a `ConcurrentMap<Map<String, ?>, Queue<SubmittedRecord>>` that stores a single dequeue per unique source partition. This would allow us to iterate over the bare minimum number of records for every single offset commit and not spend time, for example, on accumulated records for unavailable Kafka partitions. We'd still have to iterate over those records eventually if the Kafka partition came back online, but that iteration would only ever occur once, instead of once for every offset commit.
   
   I think option 3 may be warranted, although it's still possible that offset commits take a long time if 32MB worth of records end up getting queued. Option 2 may be worth implementing or at least considering as a follow-up item to handle this case.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-777782791


   @gharris1727 @tombentley @chia7712 anyone got a moment? 😃 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r583154155



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       > We end up blocking the event thread anyway because of the need to do it under the lock.
   
   I think we actually keep polling the task for records during the offset commit, which is the entire reason we have the `outstandingMessagesBacklog` field. Without it, we'd just add everything to `outstandingMessages` knowing that, if we've made it to the point of adding a record to that collection, we're not in the process of committing offset, right?
   
   Concretely, we can see that the offset thread [relinquishes the lock on the `WorkerSourceTask` instance while waiting for outstanding messages to be ack'd](https://github.com/C0urante/kafka/blob/03c5a83a8277fa7c4ec503c3e044ae61cff06eea/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java).
   
   
   I'm not sure we _need_ to perform offset commits on a separate thread, but it is in line with what we do for sink tasks, where we [leverage the `Consumer::commitAsync` method](https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L365).
   
   If we want to consider making offset commit synchronous (which is likely going to happen anyways when transactional writes for exactly-once source are introduced), that also might be worth a follow-up. The biggest problem I can think of with that approach would be that a single offline topic-partition would block up the entire task thread when it comes time for offset commit. If we keep the timeout for offset commit, then that'd limit the fallout and allow us to resume polling new records from the task and dispatching them to the producer after the commit attempt timed out. However, there'd still be a non-negligible throughput hit (especially for workers configured with higher offset timeouts).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch edited a comment on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773


   My previous suggestion simply dequeues all completed records until an unacked record is found. This is really straightforward, but we could try to do better.
   
   We could dequeue all records except those that have a source partition that has not been acked. For example, let's say we have enqueue 4 records when `commitOffsets()` is called:
   ```
   1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
   2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This might happen if records 4, 5, and 7-10 were written to different topic partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter partitions.
   
   With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the offset for these two records to the offset writer, and we'd flush offset `partition=P1,offset=O2`. We'd end up with the following remaining in the queue (using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   There are quite a few records with source partition `P2` that were acked but not dequeued, simply because they were behind an unacked record with a different source partition.
   
   However, if we dequeue all acked records with a source partition map that does not match a previously un-acked record, then we'd be able to dequeue more records and *also* flush offsets `partition=P1,offset=O2,partition=P2,offset=O7`. We'd end up with a much smaller queue (again, using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This minor change will dramatically improve the ability to commit offsets closer to what has actually be acked. 
   ```
           // Dequeue all submitted records that have been acknowledged and don't have a source partition with an unacked record
           Set<Map<String, ?>> unackedPartitions = new HashSet<>();
           Iterator<SubmittedRecord> iter = submittedRecords.iterator();
           while (iter.hasNext()) {
               SubmittedRecord next = iter.next();
               SourceRecord record = next.record();
               Map<String, ?> partition = record.sourcePartition();
               if (next.isAcknowledged() && !unackedPartitions.contains(partition)) {
                   // The record is acknowledged and does not share a source partition with an unacknowledged record,
                   // so we can remove it from the queue and write the offsets to the offset writer.
                   iter.remove();
                   offsetWriter.offset(partition, record.sourceOffset());
               } else {
                   // As soon as we see an unacknowledged record, we have to prevent dequeuing all subsequent records that use that same partition
                   unackedPartitions.add(partition);
               }
           }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r583154155



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       > We end up blocking the event thread anyway because of the need to do it under the lock.
   
   I think we actually keep polling the task for records during the offset commit, which is the entire reason we have the `outstandingMessagesBacklog` field. Without it, we'd just add everything to `outstandingMessages` knowing that, if we've made it to the point of adding a record to that collection, we're not in the process of committing offset, right?
   
   Concretely, we can see that the offset thread [relinquishes the lock on the `WorkerSourceTask` instance while waiting for outstanding messages to be ack'd](https://github.com/C0urante/kafka/blob/03c5a83a8277fa7c4ec503c3e044ae61cff06eea/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L501).
   
   
   I'm not sure we _need_ to perform offset commits on a separate thread, but it is in line with what we do for sink tasks, where we [leverage the `Consumer::commitAsync` method](https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L365).
   
   If we want to consider making offset commit synchronous (which is likely going to happen anyways when transactional writes for exactly-once source are introduced), that also might be worth a follow-up. The biggest problem I can think of with that approach would be that a single offline topic-partition would block up the entire task thread when it comes time for offset commit. If we keep the timeout for offset commit, then that'd limit the fallout and allow us to resume polling new records from the task and dispatching them to the producer after the commit attempt timed out. However, there'd still be a non-negligible throughput hit (especially for workers configured with higher offset timeouts).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante closed pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante closed pull request #10112:
URL: https://github.com/apache/kafka/pull/10112


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-915395143


   @C0urante wrote:
   > @rhauch If this is all agreeable I think we're ready to start implementing. Since you've provided a lot of the code yourself I'm happy to let you take on that work if you'd like; otherwise, I'll get started and see if I can have a new PR with these changes out by early next week.
   
   Sounds good to me! I'm looking forward to your new PR; please link here and ping me. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910510910


   First of all, thanks for trying to fix this issue, @C0urante.
   
   And thanks for your insight, @hachikuji. I agree that it seems like we should not have to block the offset commits until the full batch of records has been written to Kafka.
   
   I suspect the current logic was written this way because it's the simplest thing to do, given that the source partition map and offset map in the source records are opaque, meaning we can't sort them and have to instead rely upon the order of the source records returned by the connector. And because the producer can make progress writing to some topic partitions while not making progress on others, it's possible that some records in a batch are written before earlier records in the same batch.
   
   The bottom line is that we have to track offsets that can be committed using only the order of the records that were generated by the source task. The current logic simply blocks committing offsets until each "batch" of records is completely flushed. That way we can commit _all_ of the offsets in the batch together, and let the offset writer rely upon ordering to use only the latest offset map for each partition map when we tell it to flush.
   
   But, flushing offsets requires synchronization, and the current logic switches between the `outstandingMessages` and `outstandingMessagesBacklog` buffers to track the "batches" of records that have to complete for offset commits. It's really sort of a mess.
   
   @hachikuji wrote:
   > The patch gets around the problem by relaxing `offset.flush.timeout.ms` a little bit. Rather than treating expiration of records as a fatal error, we continue to allow more time for `outstandingMessages` to be drained. This ensures that we do not have to wait for the messages from `outstandingMessagesBacklog` which are added while the flush is in progress.
   
   That's my understanding, too. And maybe I don't grasp the subtleties of the fix, but it seems like the fix won't necessarily help when a producer is _consistently_ slow. In such cases, the `outstandingMessages` will fill with the records sent to the producer since the previous commit offsets, and as soon as we start committing offsets all records then get sent to `outstandingMessagesBacklog`. If the producer writes records significantly slower than the source task generates them, then `outstandingMessagesBacklog` could be larger than `outstandingMessages` by the time the offsets for `outstandingMessagesBacklog` are finally committed, especially if we're blocking offset commits even longer with this change. So while we're able to eventually commit those first offsets, if the backlog is larger than it will likely take longer for the producer to flush those records than it took the producer to flush the first batch.
   
   Fortunately, we do have back pressure to not let this get this too out of control: when the producer's buffer fills up, the worker source task's thread will block (up to `max.block.ms`) on calls to `producer.send(...)`, and the worker source task will retry any sends that fail after that timeout. And since this same thread that calls `poll()`, the worker source task will eventually slow calls to `poll()`.
   
   But I think we can change how offsets are flushed such that we don't have to wait for the producer, and instead we can simply flush the latest offsets for records that have been successfully written at that point. We just need a different mechanism (other than the two "outstanding" lists and the flush-related flags) to track the offsets for the most recently written records.
   
   One way to do that is to use a single concurrent queue that bookkeeps records in the same order as generated by the source task, but in a way that allows us to track _which_ records have been acked and tolerates those records being acked in any order. 
   
   For example, we could replace the `outstandingMessages` and `outstandingMessagesBacklog` fields in `WorkerSourceTask` with something like this:
   ```
      private final Queue<SubmittedRecord> submittedRecords = new ConcurrentLinkedQueue<>();
   ```
   An element is appended to this queue just before the record is sent to the producer, and the `SubmittedRecord` class allows us to track which of these records has been acknowledged:
   ```
       protected static class SubmittedRecord {
           private final SourceRecord record;
           private final AtomicBoolean acked = new AtomicBoolean();
           public SubmittedRecord(SourceRecord sourceRecord) {
               record = Objects.requireNonNull(sourceRecord);
           }
           public void acknowledge() {
               acked.set(true);
           }
           public boolean isAcknowledged() {
               return acked.get();
           }
           public SourceRecord record() {
               return record;
           }
       }
   
   ```
   and where `acknowledge()` is called from the producer callback and the `commitOffsets()` method can safely call `isAcknowledged()` and `record()` from the commit thread. The `sendRecords()` method would add a `SubmittedRecord` to the end of the queue for each record that will be sent to the producer:
   ```
       private boolean sendRecords() {
           ...
           for (final SourceRecord preTransformRecord : toSend) {
               ...
               SubmittedRecord submittedRecord = new SubmittedRecord(record);
               if (!submittedRecords.offer(submittedRecord)) {
                   // If a blocking queue, then retry using the existing mechanism in WorkerSourceTask
                   log.warn("{} Failed to add record to buffer. Backing off before retrying", this);
                   toSend = toSend.subList(processed, toSend.size());
                   lastSendFailed = true;
                   counter.retryRemaining();
                   return false;
               }
               ...
   ```
   and then have the producer callback call the `SubmittedRecord.acknowledge()` method:
   ```
               try {
                   ...
                   producer.send(
                       producerRecord,
                       (recordMetadata, e) -> {
                           if (e != null) {
                               ...
                           } else {
                               submittedRecord.acknowledge();
                               ...
                           }
                       });
   ```
   This effectively replaces the `outstandingMessages`, `outstandingMessagesBacklog` and `flushing` flag, and it simplifying the logic in the `sendRecords()` to not have to know which of those to use.
   
   Then here's the big change: in `commitOffsets()`, we can dequeue all acked records, then take the snapshot of offsets, and immediately flush offsets without waiting for the producer. And by using a concurrent queue, we don't even need to synchronize between the `sendRecords()` method adding to the back of the queue and the `commitOffsets()` pulling from the front of the queue.
   ```
       public boolean commitOffsets() {
           ...
           // Dequeue all submitted records that have been
           while (!submittedRecords.isEmpty()) {
               SubmittedRecord next = submittedRecords.peek();
               if (!next.isAcknowledged()) {
                   // This record is not yet acknowledge, so we can't continue processing any more offsets
                   break;
               }
               submittedRecords.poll();
               // The record is acknowledged, so add the offsets to the offset writer
               // Offsets are converted & serialized in the OffsetWriter
               SourceRecord record = next.record();
               offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
           }
           ...
           synchronized (this) {
               boolean flushStarted = offsetWriter.beginFlush();
               if (!flushStarted) {
                   ...
               }
           }
   
           // Now we can actually flush the offsets to user storage.
           Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
               if (error != null) {
                   log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
               } else {
                   log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
               }
           });
           ...
       }
   ```
   
   I've shown the snippet above using a non-blocking queue of unlimited size. I think we could do this because the existing WorkerSourceTask logic already handles the possibility that the `producer.send(...)` blocks when its buffer is full, up to `max.block.ms` before throwing a retriable exception, and then retrying the send if needed. Since this happens on the same thread that calls `SourceTask.poll()`, this existing logic already has the backpressure that is based upon the producer setting and that prevents the source task getting too far ahead of the producer.
   
   Alternatively, we could use a blocking queue, but this would require an additional worker configuration, which is not ideal and can't be backported.
   
   @C0urante, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch edited a comment on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910510910


   First of all, thanks for trying to fix this issue, @C0urante.
   
   And thanks for your insight, @hachikuji. I agree that it seems like we should not have to block the offset commits until the full batch of records has been written to Kafka.
   
   I suspect the current logic was written this way because it's the simplest thing to do, given that the source partition map and offset map in the source records are opaque, meaning we can't sort them and have to instead rely upon the order of the source records returned by the connector. And because the producer can make progress writing to some topic partitions while not making progress on others, it's possible that some records in a batch are written before earlier records in the same batch.
   
   The bottom line is that we have to track offsets that can be committed using only the order of the records that were generated by the source task. The current logic simply blocks committing offsets until each "batch" of records is completely flushed. That way we can commit _all_ of the offsets in the batch together, and let the offset writer rely upon ordering to use only the latest offset map for each partition map when we tell it to flush.
   
   But, flushing offsets requires synchronization, and the current logic switches between the `outstandingMessages` and `outstandingMessagesBacklog` buffers to track the "batches" of records that have to complete for offset commits. It's really sort of a mess.
   
   @hachikuji wrote:
   > The patch gets around the problem by relaxing `offset.flush.timeout.ms` a little bit. Rather than treating expiration of records as a fatal error, we continue to allow more time for `outstandingMessages` to be drained. This ensures that we do not have to wait for the messages from `outstandingMessagesBacklog` which are added while the flush is in progress.
   
   That's my understanding, too. And maybe I don't grasp the subtleties of the fix, but it seems like the fix won't necessarily help when a producer is _consistently_ slow. In such cases, the `outstandingMessages` will fill with the records sent to the producer since the previous commit offsets, and as soon as we start committing offsets all records then get added to `outstandingMessagesBacklog`. If the producer writes records significantly slower than the source task generates them, then `outstandingMessagesBacklog` could be larger than `outstandingMessages` by the time the offsets for `outstandingMessages` are finally committed, especially if we're blocking offset commits even longer with this change. So while we're able to eventually commit those first offsets, if the backlog is larger then it will likely take longer for the producer to flush those records than it took the producer to flush the first batch. The offset commit thread remains blocked for longer and longer periods of 
 time.
   
   Fortunately, we do have back pressure to not let this get this too out of control: when the producer's buffer fills up, the worker source task's thread will block (up to `max.block.ms`) on calls to `producer.send(...)`, and the worker source task will retry any sends that fail after that timeout. And since this same thread that calls `poll()`, the worker source task will eventually slow calls to `poll()`.
   
   But I think we can change how offsets are flushed such that we don't have to wait for the producer, and instead we can simply flush the latest offsets for records that have been successfully written at that point. We just need a different mechanism (other than the two "outstanding" lists and the flush-related flags) to track the offsets for the most recently written records.
   
   One way to do that is to use a single concurrent queue that bookkeeps records in the same order as generated by the source task, but in a way that allows us to track _which_ records have been acked and tolerates those records being acked in any order. 
   
   For example, we could replace the `outstandingMessages` and `outstandingMessagesBacklog` fields in `WorkerSourceTask` with something like this:
   ```
      private final Queue<SubmittedRecord> submittedRecords = new ConcurrentLinkedQueue<>();
   ```
   An element is appended to this queue just before the record is sent to the producer, and the `SubmittedRecord` class allows us to track which of these records has been acknowledged:
   ```
       protected static class SubmittedRecord {
           private final SourceRecord record;
           private final AtomicBoolean acked = new AtomicBoolean();
           public SubmittedRecord(SourceRecord sourceRecord) {
               record = Objects.requireNonNull(sourceRecord);
           }
           public void acknowledge() {
               acked.set(true);
           }
           public boolean isAcknowledged() {
               return acked.get();
           }
           public SourceRecord record() {
               return record;
           }
       }
   
   ```
   and where `acknowledge()` is called from the producer callback and the `commitOffsets()` method can safely call `isAcknowledged()` and `record()` from the commit thread. The `sendRecords()` method would add a `SubmittedRecord` to the end of the queue for each record that will be sent to the producer:
   ```
       private boolean sendRecords() {
           ...
           for (final SourceRecord preTransformRecord : toSend) {
               ...
               SubmittedRecord submittedRecord = new SubmittedRecord(record);
               if (!submittedRecords.offer(submittedRecord)) {
                   // If a blocking queue, then retry using the existing mechanism in WorkerSourceTask
                   log.warn("{} Failed to add record to buffer. Backing off before retrying", this);
                   toSend = toSend.subList(processed, toSend.size());
                   lastSendFailed = true;
                   counter.retryRemaining();
                   return false;
               }
               ...
   ```
   and then have the producer callback call the `SubmittedRecord.acknowledge()` method:
   ```
               try {
                   ...
                   producer.send(
                       producerRecord,
                       (recordMetadata, e) -> {
                           if (e != null) {
                               ...
                           } else {
                               submittedRecord.acknowledge();
                               ...
                           }
                       });
   ```
   This effectively replaces the `outstandingMessages`, `outstandingMessagesBacklog` and `flushing` flag, and it simplifying the logic in the `sendRecords()` to not have to know which of those to use.
   
   Then here's the big change: in `commitOffsets()`, we can dequeue all acked records, then take the snapshot of offsets, and immediately flush offsets without waiting for the producer. And by using a concurrent queue, we don't even need to synchronize between the `sendRecords()` method adding to the back of the queue and the `commitOffsets()` pulling from the front of the queue.
   ```
       public boolean commitOffsets() {
           ...
           // Dequeue all submitted records that have been
           while (!submittedRecords.isEmpty()) {
               SubmittedRecord next = submittedRecords.peek();
               if (!next.isAcknowledged()) {
                   // This record is not yet acknowledge, so we can't continue processing any more offsets
                   break;
               }
               submittedRecords.poll();
               // The record is acknowledged, so add the offsets to the offset writer
               // Offsets are converted & serialized in the OffsetWriter
               SourceRecord record = next.record();
               offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
           }
           ...
           synchronized (this) {
               boolean flushStarted = offsetWriter.beginFlush();
               if (!flushStarted) {
                   ...
               }
           }
   
           // Now we can actually flush the offsets to user storage.
           Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
               if (error != null) {
                   log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
               } else {
                   log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
               }
           });
           ...
       }
   ```
   
   I've shown the snippet above using a non-blocking queue of unlimited size. I think we could do this because the existing WorkerSourceTask logic already handles the possibility that the `producer.send(...)` blocks when its buffer is full, up to `max.block.ms` before throwing a retriable exception, and then retrying the send if needed. Since this happens on the same thread that calls `SourceTask.poll()`, this existing logic already has the backpressure that is based upon the producer setting and that prevents the source task getting too far ahead of the producer.
   
   Alternatively, we could use a blocking queue, but this would require an additional worker configuration, which is not ideal and can't be backported.
   
   @C0urante, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582378783



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       > (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd.
   
   Ok, that rings a bell. I think I see how the logic works now and I don't see an obvious way to make it simpler. Doing something finer-grained as you said might be the way to go. Anyway, I agree this is something to save for a follow-up improvement.
   
   > I think it's a necessary evil, since source task offset commits are conducted on a single thread. Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster.
   
   Hmm.. This is suspicious. Why do we need to block the executor while we wait for the flush? Would it be simpler to let the worker source task finish the flush and the offset commit in its own event thread? We end up blocking the event thread anyway because of the need to do it under the lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910557464


   Also, with my proposed approach, the `offsetWriter` is only used (directly or indirectly) within the `commitOffsets()` method. That means we could actually do without the `synchronized(this)` block around the `offsetWriter.beginFlush()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-777865163


   Yeah, that's fair. I found https://github.com/apache/kafka/blob/2caef070d4199877337f59399563354e41b203f7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java#L550-L639 which seems applicable here. I'll refactor the tests to use a similar approach. Thanks Greg!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r578661623



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -1100,7 +1233,7 @@ private void assertPollMetrics(int minimumPollCountExpected) {
         double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
         assertEquals(0, activeCount, 0.000001d);
         if (minimumPollCountExpected > 0) {
-            assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+            assertEquals(activeCountMaxExpected, activeCountMaxExpected, 0.000001d);

Review comment:
       Ah yeah, good catch. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante edited a comment on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-782174511


   Ah, gotcha! Yeah, `SourceTask::stop` isn't really a key part of the functional changes here or the tests added. The idea is just to prevent high-throughput source tasks whose producers are unable to keep up with them from entering a death spiral where they stop being able to commit offsets completely.
   
   This is because right now, when an offset commit attempt fails because the current batch of source records wasn't flushed to Kafka in time, all backlogged records that were read from `SourceTask::poll` during that time get added to the batch, which just compounds the problem.
   
   For example:
   1. The source task generates 10000 records and the worker starts sending those to Kafka.
   1. An offset commit is triggered on a separate thread and the worker waits for up to `offset.flush.timeout.ms` milliseconds for all of those records to be ack'd by the broker. In the meantime, on the main thread, the worker continues polling the source task for data.
   1. The worker times out while waiting the original batch of 10000 records to be flushed; let's say that 5000 managed to be written successfully but 5000 remain unacknowledged. Additionally, during this time, the worker managed to poll an additional 1000 records from the task.
   
   At this point, the current behavior is:
   1. Abort the offset commit, and start a new batch consisting of the 5000 unacknowledged records from the previous batch and the 1000 records polled from the task during the failed offset commit attempt.
   1. Continue adding records to that batch until the next offset commit attempt is triggered.
   
   If the task is generating a steady throughput of 10000 records per offset commit attempt, and the worker's producer is only able to write 5000 of those before the offset commit attempt times out, the worker will never be able to successfully commit offsets for the task, even though there are plenty of records that have been sent to and ack'd by the broker.
   
   The proposed behavior in the PR is:
   1. Abort the offset commit, and keep the old batch of the 5000 unacknowledged records. Add the 1000 records polled during the failed offset commit attempt to a backlog.
   1. Continue adding newly-polled records from the task to that backlog.
   1. On the next offset commit attempt, only wait to flush out the records from the active batch (i.e., the 5000 unacknowledged records), and only write offsets for that batch.
   1. If successful, use the backlog of records as the new batch of records. Otherwise, keep the same batch of records and continue adding newly-polled records to the backlog.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-915350922


   > I also think that the behavior with the suggested approach and your option 3 is still a lot better than the current situation.
   
   Agreed 👍 
   
   > IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as there actually are no timeouts as the offset commit thread doesn't block anymore.
   
   That's mostly correct--we wouldn't be waiting on a blocking operation while iterating through the dequeue(s), although we might still choose to block on the actual write to the offset topic in the [same way that we currently](https://github.com/apache/kafka/blob/fb77da941ac2a34513cf2cd5d11137ba9b275575/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L565-L586) do just for the sake of metrics and allowing users to monitor the health of the connection between the Connect worker and the offsets topic. Not a huge deal though, and the point that we wouldn't be blocking on the task's producer is still valid.
   
   I think the issue is less that we'd end up timing out and more that we'd end up violating the guarantee that's provided right now by the framework that each task gets to take up only `offset.flush.timeout.ms` milliseconds per offset commit attempt before aborting the attempt and yielding control to the next task. A dequeue-based approach may actually be worse than the current behavior in that regard if there's no check in place to ensure that iterating over the dequeue doesn't exceed the offset flush timeout. Probably worth the tradeoff, but we can probably satisfy both objectives with your suggestion:
   
   > another option might be to incur the iteration on the worker source task thread.
   
   I think this'd be great, especially with the snapshotting logic you mention, which should basically eliminate any blocking between the two threads except to prevent race conditions while simple operations like clearing a hash map or assigning a new value to an instance variable take place.
   
   One thing that gave me pause initially was the realization that we'd be double-iterating over every source record at this point: once to transform, convert, and dispatch the record to the producer, and then once to verify that it had been acknowledged while iterating over the dequeue it's in. But I can't imagine it'd make a serious difference with CPU utilization given that transformation, conversion, and dispatching to a producer are likely to be at least an order of magnitude more expensive than just checking a boolean flag and possibly inserting the record's offset into a hash map. And memory utilization should be very close to the existing approach, which already tracks every single unacknowledged record in the `outstandingMessages` and `outstandingMessagesBacklog` fields.
   
   I think this buys us enough that my earlier-mentioned option 2 (multiple threads for offset commits) isn't called for, since the only blocking operation that would be performed during offset commit at this point is a write to the offsets topic. If the offsets topic is unavailable, it's likely that the impact would be the same across all tasks (unless the task is using a separate offsets topic, which will become possible once the changes for KIP-618 are merged), and even if not, things wouldn't be made any worse than they already are: the offset flush timeout would expire, and the next task in line would get its chance to commit offsets.
   
   @rhauch If this is all agreeable I think we're ready to start implementing. Since you've provided a lot of the code yourself I'm happy to let you take on that work if you'd like; otherwise, I'll get started and see if I can have a new PR with these changes out by early next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r583219431



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       It's mostly the flushing that concerns me, not really the offset commit. I don't think we need to make it synchronous, just that it seems silly to block that shared scheduler to complete it. My thought instead was to let the scheduler trigger the flush, but then let the task be responsible for waiting for its completion. While waiting, of course, it can continue writing to `outstandingMessagesBacklog`. So I don't think there should be any issue from a throughput perspective.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch edited a comment on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773


   My previous suggestion simply dequeues all completed records until an unacked record is found. This is really straightforward, but we could try to do better.
   
   We could dequeue all records except those that have a source partition that has not been acked. For example, let's say we have enqueue 4 records when `commitOffsets()` is called:
   ```
   1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
   2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This might happen if records 4, 5, and 7-10 were written to different topic partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter partitions.
   
   With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the offset for these two records to the offset writer, and we'd flush offset `partition=P1,offset=O2`. We'd end up with the following remaining in the queue (using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   There are quite a few records with source partition `P2` that were acked but not dequeued, simply because they were behind an unacked record with a different source partition.
   
   However, if we dequeue all acked records with a source partition map that does not match a previously un-acked record, then we'd be able to dequeue more records and *also* flush offsets `partition=P1,offset=O2,partition=P2,offset=O7`. We'd end up with a much smaller queue (again, using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This minor change will dramatically improve the ability to commit offsets closer to what has actually be acked.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r578340374



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -1100,7 +1233,7 @@ private void assertPollMetrics(int minimumPollCountExpected) {
         double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
         assertEquals(0, activeCount, 0.000001d);
         if (minimumPollCountExpected > 0) {
-            assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+            assertEquals(activeCountMaxExpected, activeCountMaxExpected, 0.000001d);

Review comment:
       Should the right hand side be `activeCountMax`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r589603947



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       I've been ruminating over this for a few days and I think it should be possible to make task offset commits independent of each other by changing the source task offset commit scheduler to use a multi-threaded executor instead of a global single-threaded executor for all tasks. This isn't quite the same thing as what you're proposing since tasks would still not be responsible for waiting for flush completion (the offset scheduler's threads would be), but it's a smaller change and as far as I can tell, the potential downsides only really amount to a few extra threads being created.
   
   The usage of [`scheduleWithFixedDelay`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-) already ensures that two offset commits for the same task won't be active at the same time, as it "Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next."
   
   Beyond that, the only concern that comes to mind is potential races caused by concurrent access of the offset backing store and its underlying resources.
   
   In distributed mode, the `KafkaOffsetBackingStore` and its usage of the underlying `KafkaBasedLog` appear to be thread-safe as everything basically boils down to calls to `Producer::send`, which should be fine.
   
   In standalone mode, the `MemoryOffsetBackingStore` handles all writes/reads of the local offsets file via a single-threaded executor, so concurrent calls to `MemoryOffsetBackingStore::set` should also be fine.
   
   
   Granted, none of this addresses your original concern, which is whether an offset commit timeout is necessary at all. In response to that, I think we may also want to revisit the offset commit logic and possibly do away with a timeout altogether. In sink tasks, for example, offset commit timeouts are almost a cosmetic feature at this point and are really only useful for metrics tracking. However, at the moment it's actually been pretty useful to us to monitor source task offset commit success/failure JMX metrics as a means of tracking overall task health. We might be able to make up the difference by relying on metrics for the number of active records, but it's probably not safe to make that assumption for all users, especially for what is intended to be a bug fix. So, if possible, I'd like to leave a lot of the offset commit logic intact as it is for the moment and try to keep the changes here minimal.
   
   
   To summarize: I'd like to proceed by keeping the currently-proposed changes, and changing the source task offset committer to use a multi-threaded executor instead of a single-threaded executor. I can file a follow-up ticket to track improvements in offset commit logic (definitely for source tasks, and possibly for sinks) and we can look into that if it becomes a problem in the future. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582259574



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -98,7 +98,8 @@
     private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
     // A second buffer is used while an offset flush is running
     private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
-    private boolean flushing;
+    private boolean recordFlushPending;
+    private boolean offsetFlushPending;
     private CountDownLatch stopRequestedLatch;

Review comment:
       nit: while we're at it, this could be `final`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       If I understand it correctly, the main difference in this patch is that we no longer fail the flush if the messages cannot be drained quickly enough from `outstandingMessages`.  A few questions come to mind:
   
   1. Is the flush timeout still a useful configuration? Was it ever? Even if we timeout, we still have to wait for the records that were sent to the producer.
   2. While we are waiting for `outstandingMessages` to be drained, we are still accumulating messages in `outstandingMessagesBacklog`. I imagine we can get into a pattern here once we fill up the accumulator. While we're waiting for `outstandingMessages` to complete, we fill `outstandingMessagesBacklog`. Once the flush completes, `outstandingMessagesBacklog` becomes `outstandingMessages` and we are stuck waiting again. Could this prevent us from satisfying the commit interval?
   
   Overall, I can't shake the feeling that this logic is more complicated than necessary. Why do we need the concept of flushing at all? It would be more intuitive to just commit whatever the latest offsets are. Note that we do not use `outstandingMessages` for the purpose of retries. Once a request has been handed off to the producer successfully, we rely on the producer to handle retries. Any delivery failure after that is treated as fatal. So then does `oustandingMessages` serve any other purpose other than tracking flushing? I am probably missing something here. It has been a long time since I reviewed this logic.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-781565662


   Yeah, the unit tests for the worker classes in general can be a little gnarly. The boilerplate segments (for things like setting up the converter, transformation chain, task context, headers, topic tracking, offset buffering, status tracking, and performing metrics assertions) are in line with the other tests in this class, though. The only difference with the new unit test here is that we want to test task behavior under some pretty fine-grained circumstances, which is accomplished right now by setting up latches and awaiting them to ensure that the task (which is running on a separate thread) has reached certain points in its lifecycle and not gone any further. If you have suggestions for how to improve that I'm all ears!
   
   I'm not sure what you're referring to with task restart--that's not tested for at the moment, and any tests for that would likely be dependent on the success (or lack thereof) of any prior offset commit attempts, which are already tested for. Can you clarify or provide a brief example?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       1. I think it's a necessary evil, since source task offset commits are [conducted on a single thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64). Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster.
   2. This is definitely possible; I think the only saving grace here is that the combined sizes of the `outstandingMessages` and `outstandingMessagesBacklog` fields is going to be naturally throttled by the producer's buffer. If too many records are accumulated, the call to `Producer::send` will block synchronously until space is freed up, at which point, the worker can continue polling the task for new records. This isn't ideal as it will essentially cause the producer's entire buffer to be occupied until the throughput of record production from the task decreases and/or the write throughput of the producer rises to meet it, but it at least establishes an upper bound for how large a single batch of records in the `oustandingMessages` field ever gets. It may take several offset commit attempts for all of the records in that batch to be ack'd, with all but the last (successful) attempt timing out and failing, but forward progress with offset commits should still be possible.
   
   I share your feelings about the complexity here. I think ultimately it arises from two constraints:
   1. A worker-global producer is used to write source offsets to the internal offsets topic right now. Although this doesn't necessarily require the single-threaded logic for offset commits mentioned above, things become simpler with it.
   2. (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd.
   
   I don't think either of these points make it impossible to add even more-fine-grained offset commit behavior and/or remove offset commit timeouts, but the work involved would be a fair amount heavier than this relatively-minor patch. If you'd prefer to see something along those lines, could we merge this patch for the moment and perform a more serious overhaul of the source task offset commit logic as a follow-up, possibly with a small design discussion on a Jira ticket to make sure there's alignment on the new behavior?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-914547745


   @C0urante, thanks for the feedback on my suggestion. I like your option 3, because it does allow the iteration to stop on each source partition as soon as it encounters the first unacknowledged record in each queue. I also think that the behavior with the suggested approach and your option 3 is still a lot better than the current situation. 
   
   One question, though: you mention that it might be a problem if iterating over the submitted records takes longer than `offset.flush.timeout.ms`. But IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as there actually are no timeouts as the offset commit thread doesn't block anymore. So, worst case, if task A has a ton of submitted records that have to be iterated over (e.g., fast producer and fast source task), it might slow the committing of offsets for other tasks. (Again, this is not any worse than the current behavior.) But your option 2 would help with this at the risk of using more threads, and so we may also want to consider this to help ensure that no slowly-proceeding producer of a task blocks other offset commits.
   
   Of course, another option might be to incur the iteration on the worker source task thread. That would essentially move the use of the queue(s) to the worker source task thread, tho we still need to get the offsets to the offset commit thread and so would likely have to keep the synchronization blocks around the offset writer snapshot. On one hand, that's putting more work onto the worker source task thread and making the offset thread super straightforward (snapshot and write); on the other it's putting the onus on the worker source task thread.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-780718729


   The only consistent test failure for the last run appears unrelated to the changes here. See https://github.com/apache/kafka/pull/10140/checks?check_run_id=1917700907 and https://github.com/apache/kafka/pull/10077/checks?check_run_id=1877979511 for other instances of the same failure.
   
   ### org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithDefaultSettings
   ```
   org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. Didn't find the topics [connect-storage-topic-connect-cluster-1, connect-config-topic-connect-cluster-1, connect-offset-topic-connect-cluster-1] ==> expected: <true> but was: <false>
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       1. I think it's a necessary evil, since source task offset commits are [conducted on a single thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64). Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster.
   2. This is definitely possible; I think the only saving grace here is that the combined sizes of the `outstandingMessages` and `outstandingMessagesBacklog` fields is going to be naturally throttled by the producer's buffer. If too many records are accumulated, the call to `Producer::send` will block synchronously until space is freed up, at which point, the worker can continue polling the task for new records. This isn't ideal as it will essentially cause the producer's entire buffer to be occupied until the throughput of record production from the task decreases and/or the write throughput of the producer rises to meet it, but it at least establishes an upper bound for how large a single batch of records in the `oustandingMessages` field ever gets. It may take several offset commit attempts for all of the records in that batch to be ack'd, with all but the last (successful) attempt timing out and failing, but forward progress with offset commits should still be possible.
   
   I share your feelings about the complexity here. I think ultimately it arises from two constraints:
   1. A worker-global producer is used to write source offsets to the internal offsets topic right now. Although this doesn't necessarily require the single-threaded logic for offset commits mentioned above, things become simpler with it.
   2. (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd.
   
   I don't think either of these points make it impossible to add even more-fine-grained offset commit behavior and/or remove offset commit timeouts, but the work involved would be a fair amount heavier than this relatively-minor patch. If you'd prefer to see something along those lines, could we consider merging this patch for the moment and perform a more serious overhaul of the source task offset commit logic as a follow-up, possibly with a small design discussion on a Jira ticket to make sure there's alignment on the new behavior?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-782156793


   I did not look at the tests closely yet but I thought `sourceTask.stop();` (https://github.com/apache/kafka/pull/10112/files#diff-b8da514e331b4d8bff623626ee31a45048b753911193b564a9b65f02b97f53c8R922) was a key part of the test. Maybe it doesn't! 
   
   I hope to take another look some time next week.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-918481881


   @rhauch (and, if interested, @hachikuji) new PR is up: https://github.com/apache/kafka/pull/11323


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-782174511


   Ah, gotcha! Yeah, `SourceTask::stop` isn't really a key part of the functional changes here or the tests added. The idea is just to prevent high-throughput source tasks whose producers are unable to keep up with them from entering a death spiral where they stop being able to commit offsets completely.
   
   This is because right now, when an offset commit attempt fails because the current batch of source records wasn't flushed to Kafka in time, all backlogged records that were read from `SourceTask::poll` during that time get added to the batch, which just compounds the problem.
   
   For example:
   1. The source task generates 10000 records and the worker starts sending those to Kafka.
   1. An offset commit is triggered on a separate thread and the worker waits for up to `offset.flush.timeout.ms` milliseconds for all of those records to be ack'd by the broker. In the meantime, on the main thread, the worker continues polling the source task for data.
   1. The worker times out while waiting the original batch of 10000 records to be flushed; let's say that 5000 managed to be written successfully but 5000 remain unacknowledged. Additionally, during this time, the worker managed to poll an additional 1000 records from the task.
   
   At this point, the current behavior is:
   1. Abort the offset commit, and start a new batch consisting of the 5000 unacknowledged records from the previous batch and the 1000 records polled from the task during the failed offset commit attempt.
   1. Continue adding records to that batch until the next offset commit attempt is triggered.
   
   If the task is generating a steady throughput of 10000 records per offset commit attempt, and the worker's producer is only able to write 5000 of those before the offset commit attempt times out, the worker will never be able to successfully commit offsets for the task, even though there are plenty of records that have been sent to and ack'd by the broker.
   
   The proposed behavior in the PR is:
   1. Abort the offset commit, and keep the old batch of the 5000 unacknowledged records. Add the 1000 records polled during the failed offset commit attempt to a backlog.
   1. Continue adding newly-polled records from the task to that backlog.
   1. On the next offset commit attempt, only wait to flush out the records from the active batch (i.e., the 5000 unacknowledged records from step 4), and only write offsets for that batch.
   1. If successful, use the backlog of records as the new batch of records. Otherwise, keep the same batch of records and continue adding newly-polled records to the backlog.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante edited a comment on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-782174511


   Ah, gotcha! Yeah, `SourceTask::stop` isn't really a key part of the functional changes here or the tests added. The idea is just to prevent high-throughput source tasks whose producers are unable to keep up with them from entering a death spiral where they stop being able to commit offsets completely.
   
   This is because right now, when an offset commit attempt fails because the current batch of source records wasn't flushed to Kafka in time, all backlogged records that were read from `SourceTask::poll` during that time get added to the batch, which just compounds the problem.
   
   For example:
   1. The source task generates 10000 records and the worker starts sending those to Kafka.
   1. An offset commit is triggered on a separate thread and the worker waits for up to `offset.flush.timeout.ms` milliseconds for all of those records to be ack'd by the broker. In the meantime, on the main thread, the worker continues polling the source task for data.
   1. The worker times out while waiting the original batch of 10000 records to be flushed; let's say that 5000 managed to be written successfully but 5000 remain unacknowledged. Additionally, during this time, the worker managed to poll an additional 1000 records from the task.
   
   At this point, the current behavior is:
   1. Abort the offset commit, and start a new batch consisting of the 5000 unacknowledged records from the previous batch and the 1000 records polled from the task during the failed offset commit attempt.
   1. Continue adding newly-polled records to that batch until the next offset commit attempt is triggered.
   
   If the task is generating a steady throughput of 10000 records per offset commit attempt, and the worker's producer is only able to write 5000 of those before the offset commit attempt times out, the worker will never be able to successfully commit offsets for the task, even though there are plenty of records that have been sent to and ack'd by the broker.
   
   The proposed behavior in the PR is:
   1. Abort the offset commit, and keep the old batch of the 5000 unacknowledged records. Add the 1000 records polled during the failed offset commit attempt to a backlog.
   1. Continue adding newly-polled records from the task to that backlog.
   1. On the next offset commit attempt, only wait to flush out the records from the active batch (i.e., the 5000 unacknowledged records), and only write offsets for that batch.
   1. If successful, use the backlog of records as the new batch of records. Otherwise, keep the same batch of records and continue adding newly-polled records to the backlog.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773


   My previous suggestion simply dequeues all completed records until an unacked record is found. This is really straightforward, but we could try to do better.
   
   We could dequeue all records except those that have a source partition that has not been acked. For example, let's say we have enqueue 4 records when `commitOffsets()` is called:
   ```
   1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
   2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This might happen if records 4, 5, and 7-10 were written to different topic partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter partitions.
   
   With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the offset for these two records to the offset writer, and we'd flush offset `partition=P1,offset=O2`. But the connector has made significantly more progress on source partition `partition=P2`, and it would be good to *also* flush offset `partition=P2,offset=O7`.
   
   We could do that by dequeuing acked records only if their source partition map does not match a previously unacked message. This way, we'd end up with the following remaining in the queue (using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This minor change will dramatically improve the ability to commit offsets closer to what has actually be acked.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch edited a comment on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773


   My previous suggestion simply dequeues all completed records until an unacked record is found. This is really straightforward, but we could try to do better.
   
   We could dequeue all records except those that have a source partition that has not been acked. For example, let's say we have enqueue 4 records when `commitOffsets()` is called:
   ```
   1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
   2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This might happen if records 4, 5, and 7-10 were written to different topic partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter partitions.
   
   With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the offset for these two records to the offset writer, and we'd flush offset `partition=P1,offset=O2`. We'd end up with the following remaining in the queue (using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   There are quite a few records with source partition `P2` that were acked but not dequeued, simply because they were behind an unacked record with a different source partition.
   
   However, if we dequeue all acked records with a source partition map that does not match a previously un-acked record, then we'd be able to dequeue more records and *also* flush offsets `partition=P1,offset=O2,partition=P2,offset=O7`. We'd end up with a much smaller queue (again, using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This minor change will dramatically improve the ability to commit offsets closer to what has actually be acked:
   ```
           // Dequeue all submitted records that have been acknowledged and don't have a source partition with an unacknowledged record
           Set<Map<String, ?>> unackedPartitions = new HashSet<>();
           while (!submittedRecords.isEmpty()) {
               SubmittedRecord next = submittedRecords.peek();
               SourceRecord record = next.record();
               Map<String, ?> partition = record.sourcePartition();
               if (next.isAcknowledged() && !unackedPartitions.contains(partition)) {
                   submittedRecords.poll();
                   // The record is acknowledged, so add the offsets to the offset writer
                   // Offsets are converted & serialized in the OffsetWriter
                   offsetWriter.offset(partition, record.sourceOffset());
               } else {
                   // As soon as we see an unacknowledged record, we have to prevent dequeuing all subsequent records that use that same partition
                   unackedPartitions.add(partition);
               }
           }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante closed pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

Posted by GitBox <gi...@apache.org>.
C0urante closed pull request #10112:
URL: https://github.com/apache/kafka/pull/10112


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org