You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/24 21:51:57 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582259574



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -98,7 +98,8 @@
     private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
     // A second buffer is used while an offset flush is running
     private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
-    private boolean flushing;
+    private boolean recordFlushPending;
+    private boolean offsetFlushPending;
     private CountDownLatch stopRequestedLatch;

Review comment:
       nit: while we're at it, this could be `final`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly the current state. This
             // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
+            // class, which setting recordFlushPending = true will handle by storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored in the
             // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       If I understand it correctly, the main difference in this patch is that we no longer fail the flush if the messages cannot be drained quickly enough from `outstandingMessages`.  A few questions come to mind:
   
   1. Is the flush timeout still a useful configuration? Was it ever? Even if we timeout, we still have to wait for the records that were sent to the producer.
   2. While we are waiting for `outstandingMessages` to be drained, we are still accumulating messages in `outstandingMessagesBacklog`. I imagine we can get into a pattern here once we fill up the accumulator. While we're waiting for `outstandingMessages` to complete, we fill `outstandingMessagesBacklog`. Once the flush completes, `outstandingMessagesBacklog` becomes `outstandingMessages` and we are stuck waiting again. Could this prevent us from satisfying the commit interval?
   
   Overall, I can't shake the feeling that this logic is more complicated than necessary. Why do we need the concept of flushing at all? It would be more intuitive to just commit whatever the latest offsets are. Note that we do not use `outstandingMessages` for the purpose of retries. Once a request has been handed off to the producer successfully, we rely on the producer to handle retries. Any delivery failure after that is treated as fatal. So then does `oustandingMessages` serve any other purpose other than tracking flushing? I am probably missing something here. It has been a long time since I reviewed this logic.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org