You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/01 18:04:54 UTC

[GitHub] [kafka] rhauch edited a comment on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910510910


   First of all, thanks for trying to fix this issue, @C0urante.
   
   And thanks for your insight, @hachikuji. I agree that it seems like we should not have to block the offset commits until the full batch of records has been written to Kafka.
   
   I suspect the current logic was written this way because it's the simplest thing to do, given that the source partition map and offset map in the source records are opaque, meaning we can't sort them and have to instead rely upon the order of the source records returned by the connector. And because the producer can make progress writing to some topic partitions while not making progress on others, it's possible that some records in a batch are written before earlier records in the same batch.
   
   The bottom line is that we have to track offsets that can be committed using only the order of the records that were generated by the source task. The current logic simply blocks committing offsets until each "batch" of records is completely flushed. That way we can commit _all_ of the offsets in the batch together, and let the offset writer rely upon ordering to use only the latest offset map for each partition map when we tell it to flush.
   
   But, flushing offsets requires synchronization, and the current logic switches between the `outstandingMessages` and `outstandingMessagesBacklog` buffers to track the "batches" of records that have to complete for offset commits. It's really sort of a mess.
   
   @hachikuji wrote:
   > The patch gets around the problem by relaxing `offset.flush.timeout.ms` a little bit. Rather than treating expiration of records as a fatal error, we continue to allow more time for `outstandingMessages` to be drained. This ensures that we do not have to wait for the messages from `outstandingMessagesBacklog` which are added while the flush is in progress.
   
   That's my understanding, too. And maybe I don't grasp the subtleties of the fix, but it seems like the fix won't necessarily help when a producer is _consistently_ slow. In such cases, the `outstandingMessages` will fill with the records sent to the producer since the previous commit offsets, and as soon as we start committing offsets all records then get added to `outstandingMessagesBacklog`. If the producer writes records significantly slower than the source task generates them, then `outstandingMessagesBacklog` could be larger than `outstandingMessages` by the time the offsets for `outstandingMessages` are finally committed, especially if we're blocking offset commits even longer with this change. So while we're able to eventually commit those first offsets, if the backlog is larger then it will likely take longer for the producer to flush those records than it took the producer to flush the first batch. The offset commit thread remains blocked for longer and longer periods of 
 time.
   
   Fortunately, we do have back pressure to not let this get this too out of control: when the producer's buffer fills up, the worker source task's thread will block (up to `max.block.ms`) on calls to `producer.send(...)`, and the worker source task will retry any sends that fail after that timeout. And since this same thread that calls `poll()`, the worker source task will eventually slow calls to `poll()`.
   
   But I think we can change how offsets are flushed such that we don't have to wait for the producer, and instead we can simply flush the latest offsets for records that have been successfully written at that point. We just need a different mechanism (other than the two "outstanding" lists and the flush-related flags) to track the offsets for the most recently written records.
   
   One way to do that is to use a single concurrent queue that bookkeeps records in the same order as generated by the source task, but in a way that allows us to track _which_ records have been acked and tolerates those records being acked in any order. 
   
   For example, we could replace the `outstandingMessages` and `outstandingMessagesBacklog` fields in `WorkerSourceTask` with something like this:
   ```
      private final Queue<SubmittedRecord> submittedRecords = new ConcurrentLinkedQueue<>();
   ```
   An element is appended to this queue just before the record is sent to the producer, and the `SubmittedRecord` class allows us to track which of these records has been acknowledged:
   ```
       protected static class SubmittedRecord {
           private final SourceRecord record;
           private final AtomicBoolean acked = new AtomicBoolean();
           public SubmittedRecord(SourceRecord sourceRecord) {
               record = Objects.requireNonNull(sourceRecord);
           }
           public void acknowledge() {
               acked.set(true);
           }
           public boolean isAcknowledged() {
               return acked.get();
           }
           public SourceRecord record() {
               return record;
           }
       }
   
   ```
   and where `acknowledge()` is called from the producer callback and the `commitOffsets()` method can safely call `isAcknowledged()` and `record()` from the commit thread. The `sendRecords()` method would add a `SubmittedRecord` to the end of the queue for each record that will be sent to the producer:
   ```
       private boolean sendRecords() {
           ...
           for (final SourceRecord preTransformRecord : toSend) {
               ...
               SubmittedRecord submittedRecord = new SubmittedRecord(record);
               if (!submittedRecords.offer(submittedRecord)) {
                   // If a blocking queue, then retry using the existing mechanism in WorkerSourceTask
                   log.warn("{} Failed to add record to buffer. Backing off before retrying", this);
                   toSend = toSend.subList(processed, toSend.size());
                   lastSendFailed = true;
                   counter.retryRemaining();
                   return false;
               }
               ...
   ```
   and then have the producer callback call the `SubmittedRecord.acknowledge()` method:
   ```
               try {
                   ...
                   producer.send(
                       producerRecord,
                       (recordMetadata, e) -> {
                           if (e != null) {
                               ...
                           } else {
                               submittedRecord.acknowledge();
                               ...
                           }
                       });
   ```
   This effectively replaces the `outstandingMessages`, `outstandingMessagesBacklog` and `flushing` flag, and it simplifying the logic in the `sendRecords()` to not have to know which of those to use.
   
   Then here's the big change: in `commitOffsets()`, we can dequeue all acked records, then take the snapshot of offsets, and immediately flush offsets without waiting for the producer. And by using a concurrent queue, we don't even need to synchronize between the `sendRecords()` method adding to the back of the queue and the `commitOffsets()` pulling from the front of the queue.
   ```
       public boolean commitOffsets() {
           ...
           // Dequeue all submitted records that have been
           while (!submittedRecords.isEmpty()) {
               SubmittedRecord next = submittedRecords.peek();
               if (!next.isAcknowledged()) {
                   // This record is not yet acknowledge, so we can't continue processing any more offsets
                   break;
               }
               submittedRecords.poll();
               // The record is acknowledged, so add the offsets to the offset writer
               // Offsets are converted & serialized in the OffsetWriter
               SourceRecord record = next.record();
               offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
           }
           ...
           synchronized (this) {
               boolean flushStarted = offsetWriter.beginFlush();
               if (!flushStarted) {
                   ...
               }
           }
   
           // Now we can actually flush the offsets to user storage.
           Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
               if (error != null) {
                   log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
               } else {
                   log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
               }
           });
           ...
       }
   ```
   
   I've shown the snippet above using a non-blocking queue of unlimited size. I think we could do this because the existing WorkerSourceTask logic already handles the possibility that the `producer.send(...)` blocks when its buffer is full, up to `max.block.ms` before throwing a retriable exception, and then retrying the send if needed. Since this happens on the same thread that calls `SourceTask.poll()`, this existing logic already has the backpressure that is based upon the producer setting and that prevents the source task getting too far ahead of the producer.
   
   Alternatively, we could use a blocking queue, but this would require an additional worker configuration, which is not ideal and can't be backported.
   
   @C0urante, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org