You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/13 18:53:50 UTC

[GitHub] [kafka] C0urante opened a new pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

C0urante opened a new pull request #11323:
URL: https://github.com/apache/kafka/pull/11323


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12226)
   
   Replaces https://github.com/apache/kafka/pull/10112
   
   Replaces the current batch-based logic for offset commits with a dynamic, non-blocking approach outlined in discussion on #10112 [here](https://github.com/apache/kafka/pull/10112#issuecomment-910510910), [here](https://github.com/apache/kafka/pull/10112#issuecomment-910540773), [here](https://github.com/apache/kafka/pull/10112#issuecomment-914348989), [here](https://github.com/apache/kafka/pull/10112#issuecomment-914547745), and [here](https://github.com/apache/kafka/pull/10112#issuecomment-915350922).
   
   Essentially, a deque is kept for every source partition that a source task produces records for, and each element in that deque is a `SubmittedRecord` with a flag to track whether the producer has ack'd the delivery of that source record to Kafka yet. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread.
   
   The behavior of the `offset.flush.timeout.ms` property is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` blocking on the acknowledgment of records by the producer.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720384889



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);

Review comment:
       Fine by me. My preferred style for that type of probing is to expose inner fields by making them package-private so I'll give that a stab; LMK if you think protected methods are significantly better.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer

Review comment:
       Ack, thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r740365960



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #removeLastOccurrence(SubmittedRecord) removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #removeLastOccurrence removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * If multiple instances of the same {@link SubmittedRecord} have been submitted already, only the first one found
+     * (traversing from the end of the deque backward) will be removed.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     * @return whether an instance of the record was removed
+     */
+    public boolean removeLastOccurrence(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition());
+            return false;
+        }
+        boolean result = deque.removeLastOccurrence(record);
+        if (deque.isEmpty()) {
+            records.remove(record.partition());
+        }
+        if (!result) {
+            log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition());
+        }
+        return result;
+    }
+
+    /**
+     * Clear out any acknowledged records at the head of the deques and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);
+        return result;
+    }

Review comment:
       I agree with your concerns about excess logging if a message is added to the `WorkerSourceTask::execute` loop.
   
   Since we're removing [this log message](https://github.com/apache/kafka/blob/af8100b94fda4a27511797233e9845078ae8a69f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L510) in this PR, I wonder if we can replace it with something similar? I think users may want to know how many total pending (i.e., unacked) messages there are, how many deques there are, and the number of messages in the largest deque (which may be useful for identifying "stuck" topic partitions).
   
   I'll take a shot at this; LMKWYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r728086626



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -378,7 +370,7 @@ private boolean sendRecords() {
                             log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                             producerSendException.compareAndSet(null, e);

Review comment:
       > So any subsequent records that were sent to a different topic partition could still have the same source partition, and thus they should be enqueued into the same deque. Those offsets would not be committed, since their SubmittedRecord instances are after the SubmittedRecord for the record that failed to send, and the latter would never be acked (as its send failed).
   
   I think this is the "vital" section and it provides a good rationale for why we intentionally keep the failed record in the queue.
   
   > If the committed offsets were moved as suggested in a separate thread above, we'd actually get a chance to commit offsets for acked source records before failing the task. It's not super essential, but it'd be good to commit the offsets for as many of those submitted-and-acked records as possible.
   
   We call `commitOffsets` in a `finally` block for `execute` right now. I think we can address this case by adding another call to `updateCommittableOffsets` right before this end-of-life call to `commitOffsets`. I've done this; LMKWYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r726483980



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly the current state. This
-            // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
-            // buffer; and the current set of user-specified offsets, stored in the
-            // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish sending
-            log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();

Review comment:
       Sure, can do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r733010990



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       Great points, @kkonstantine. You're right that using multiple deques does not quite mirror the existing behavior, whereas using a single deque would maintain the same behavior of committing offsets in the same order as they were supplied by the task, albeit more efficiently/effectively than the older behavior.
   
   I suspect that most connectors generate messages with proper source partition and source offset maps. However, if any don't then using multiple deques could result in different behavior. IOW, I think the risk is low, but non-zero. Using a single deque avoids that potential issue altogether.
   
   Given that this is a bug fix, I agree it's probably better to maintain as much behavior as possible while only fixing the undesirable behavior (in this case, blocking the offset commit thread). @C0urante did such a great job encapsulating the behavior that only the `SubmittedRecords` class would need to be changed/simplified.
   
   We can always reevaluate in the future whether to more opportunistically commit offsets for _different_ source partitions independent of the supplied record order (the order they were returned from the task), while committing offsets with the _same_ source partition in the supplied record order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r737931069



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       @C0urante, you mention above a case where using multiple deques is clearly superior:
   
   > If a subset of the topic partitions that a task is writing to is unavailable for a period of time, then writes to those topic partitions will start failing (and being infinitely retried, if the [default Connect producer properties](https://github.com/apache/kafka/blob/195ebff25b637bf09ce3283e204cfb1faffe34fc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L649-L654) are used). Writes to other topic partition will still succeed, and (assuming the task does not perform some fairly-sophisticated filtering of how it polls from the source system based on invocations of `SourceTask::commitRecord`) the task will continue producing data destined for all topic partitions with no knowledge that writes to that affected subset are not yet succeeding.
   
   This is a fair point. I think the essential factor here is the ratio of records destined for the fast vs slow topic partitions. If that ratio is even moderately high, with a high throughput connector there still will likely be enough records destined for the slow topic partitions that the producer buffer (even if set high) will eventually fill up, causing the whole connector task to be throttled. Yes, there might be a lot of records, but (depending upon the ratio) there is a limit.
   
   What happens when this ratio is skewed to a very high value, such that nearly all of records generated by the connector are destined to fast topic partitions. In that case, there might be such a small number of records destined to the slow topic partitions that the producer's buffer might never fill up. This condition could last for days. With a single deque, the worker would slowly make progress committing offsets, just in a bursty fashion. That could still leave hours and hours of offsets not committed for records destined for the fast topic partitions.
   
   I think the mistake I made before was assuming the latter case was more of an edge case, when it seems like it might not be that unusual. For example, consider a database source connector capturing rows from two tables, A and B. The producer happens to write records to topic A (from table A) very quickly, but struggles to write records to topic B (from table B). The ratio mentioned above might simply be the number of rows (or changes in rows) in each table. One could easily imagine table A having many millions of new/updated rows per day, while table B only has a handful of new/updated rows per day.
   
   The single deque approach would solve the core problem in many/most cases, but would fail to record progress via source offsets in some arguably common cases.
   
   In my mind, this changes the balance point. The multiple deque solution as proposed in this PR would work regardless of the ratio, whereas the single deque approach would not. And IMO, fixing the problem is most if not all cases outweighs the potential of a connector that might be affected by the change in order that source offsets (with different source partitions) might be committed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#issuecomment-932434127


   Thanks @rhauch, I've addressed all of the comments that seemed straightforward and left a response on the ones where a bit of discussion seems warranted before making changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r741279732



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       👍  &nbsp; SGTM. I've updated the PR accordingly.
   
   One nit: the "flushing <n> outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well.
   
   I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       👍  &nbsp; SGTM. I've updated the PR accordingly.
   
   One nit: the "flushing <n> outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch and not the number of acknowledged messages for which offsets will be committed; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well.
   
   I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720382154



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.

Review comment:
       That's not actually the case; the use of [`Deque::removeLastOccurrence`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#removeLastOccurrence-java.lang.Object-) is different from [`Deque::removeLast`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#removeLast--) and [`Deque::pollLast`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#pollLast--) in that it can and will remove an element from anywhere in the deque (as opposed to just the element at the very end) if it `equals` the one passed to `removeLastOccurrence`. The reason `removeLastOccurrence` is used here instead of the more-common [`remove`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#remove-java.lang.Object-) is that, at least with the current use of `SubmittedRecords` by the `WorkerSourceTask`, any calls to `SubmittedRecords::remove` will pass in the most-recently-submitted record, which will then be at the deque it was inserted into, so there's a (poten
 tially-substantial) performance gain.
   
   I wonder if it might be helpful to alter this method:
   - By renaming to `removeLastOccurrence`
   - By clarifying in the Javadoc that, if multiple instances of the same `SubmittedRecord` have been submitted already, only the first one found (traversing from the end of the deque backward) will be removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link #remove(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record for partition {}, but no records with that partition are present", record.partition());

Review comment:
       IIUC this is really an unexpected condition, since it's single-threaded and this method is called only in the catch block after sending a record to the producer. But without that context, anyone reading this message in the log file might be confused or even concerned about what "remove record for partition..." means. WDYT about mentioning more of that context, something like:
   ```suggestion
               log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition());
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly the current state. This
-            // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
-            // buffer; and the current set of user-specified offsets, stored in the
-            // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish sending
-            log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();

Review comment:
       In any of the cases below where we call `offsetWriter.cancelFlush()`, it appears that we're relying upon the offset writer to keep the offsets that it was unable to flush -- we're always computing new offsets at this point. 
   
   WDYT about adding a comment above line 474, something along the lines of:
   ```
    // Update the offset writer with any new offsets for records that have been acked.
    // The offset writer will continue to track all offsets until they are able to be successfully flushed.
    // IOW, if the offset writer fails to flush, it keeps those offset for the next attempt,
    // though we may update them here with newer offsets for acked records.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -570,22 +512,21 @@ public boolean commitOffsets() {
             // could look a little confusing.
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();

Review comment:
       So
   > Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets
   
   Sounds good. 
   
   BTW, the fact that the offset writer continues to track the offsets after failed flush attempts is a subtle thing, and it's worth calling out above (see my comment on lines 476-477) to help explain why we can always replace the `offsetsToCommit` map even after the offset writer failed to flush.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "adifferentvalue");
+    private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEmptyRecords();
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset);
+        // Record has been submitted but not yet acked; cannot commit offsets for it yet
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        submittedRecord.ack();
+        // Record has been acked; can commit offsets for it
+        assertEquals(Collections.singletonMap(PARTITION1, offset), submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record2.ack();
+        // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition2Record1.ack();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveLastSubmittedRecord() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset());
+        submittedRecords.remove(submittedRecord);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveNotLastSubmittedRecord() {
+        Map<String, Object> partition1Offset = newOffset();
+        Map<String, Object> partition2Offset = newOffset();
+
+        SubmittedRecord recordToRemove = submittedRecords.submit(PARTITION1, partition1Offset);
+        SubmittedRecord lastSubmittedRecord = submittedRecords.submit(PARTITION2, partition2Offset);
+
+        assertNoEmptyDeques();
+
+        submittedRecords.remove(recordToRemove);
+
+        assertNoEmptyDeques();
+        // The only record for this partition has been removed; we shouldn't be tracking a deque for it anymore
+        assertRemovedDeques(PARTITION1);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        recordToRemove.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        lastSubmittedRecord.ack();
+        // Now that the last-submitted record has been ack'd, we should be able to commit its offset
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset), submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();

Review comment:
       Also?
   ```suggestion
           assertEmptyRecords();
           assertNoEmptyDeques();
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       Actually, I now have a question: why did you choose to add it _before_ the `poll()` (a few lines down) rather than after, perhaps after the `if (!sendRecords()l) {...}` block below?
   
   The reason I ask is that if one loop of the while polls for records and sends them (where they are sent to the producer and asynchronously acked), but then the connector is paused about the same time, then the offsets for those records will not be committed until after the connector is resumed above. Is that intentional?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.

Review comment:
       > * By clarifying in the Javadoc that, if multiple instances of the same SubmittedRecord have been submitted already, only the first one found (traversing from the end of the deque backward) will be removed
   
   I think this would be great.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly the current state. This
-            // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
-            // buffer; and the current set of user-specified offsets, stored in the
-            // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish sending
-            log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();
+        }
 
-            if (!flushStarted) {
-                // There was nothing in the offsets to process, but we still waited for the data in the
-                // buffer to flush. This is useful since this can feed into metrics to monitor, e.g.
-                // flush time, which can be used for monitoring even if the connector doesn't record any
-                // offsets.
-                finishSuccessfulFlush();
-                long durationMillis = time.milliseconds() - started;
-                recordCommitSuccess(durationMillis);
-                log.debug("{} Finished offset commitOffsets successfully in {} ms",
-                        this, durationMillis);
-
-                commitSourceTask();
-                return true;
-            }
+        offsetsToCommit.forEach(offsetWriter::offset);
+        if (!offsetWriter.beginFlush()) {
+            // There was nothing in the offsets to process, but we still waited for the data in the
+            // buffer to flush. This is useful since this can feed into metrics to monitor, e.g.

Review comment:
       What do you mean by "we still waited for the data in the buffer to flush"? The `beginFlush()` method doesn't actually do any flushing; it merely performs the snapshot of the offset writer's data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r732984001



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       The choice of having a map of deques here has some interesting implications. 
   
   For cases with multiple partitions, a failure to deliver a produced record within a batch would not stop committing offsets associated with subsequent records (that have been produced and ack'ed successfully) as long as the partition of these records is different than the record that hasn't been ack'ed and eventually will fail to be produced. And that's a valid choice, however the disadvantage I see here is mainly that this new logic doesn't map always to the existing behavior. The existing code, that is based on two queues of outstanding messages, once a record is not ack'ed it also stops to commit offsets of subsequent successful records. 
   
   To match the existing behavior with this new and improved non-blocking implementation seems to me that we'd only need to use a common global `Deque` instead of this map and stop removing elements once the first non-ack'ed and non-failed with a retriable exception submitted record was encountered (like you do for a single entry and a single deque now). 
   
   To summarize a comparison of the two options: 
   * with global deque we preserve the current semantics and behavior, we make fewer allocations of collections and maps, the code is a bit simpler. 
   * with map of deques we avoid few duplicates by committing offsets of successfully produced records within a batch
   
   With the above in mind, I wonder if using a single deque is preferable here, most importantly to preserve the existing behavior while getting the benefit of unblocking the offset commit in the presence of failures. Wdyt @C0urante @rhauch ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r733010990



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       Great points, @kkonstantine. You're right that using multiple deques does not quite mirror the existing behavior, whereas using a single deque would maintain the same behavior of committing offsets in the same order as they were supplied by the task, albeit much more efficiently than the older behavior.
   
   I suspect that most connectors generate messages with proper source partition and source offset maps. However, if any don't then using multiple deques could result in different behavior. IOW, I think the risk is low, but non-zero. Using a single deque avoids that potential issue altogether.
   
   Given that this is a bug fix, I agree it's probably better to maintain as much behavior as possible while only fixing the undesirable behavior (in this case, blocking the offset commit thread). @C0urante did such a great job encapsulating the behavior that only the `SubmittedRecords` class would need to be changed/simplified.
   
   We can always reevaluate in the future whether to more opportunistically commit offsets for _different_ source partitions independent of the supplied record order (the order they were returned from the task), while committing offsets with the _same_ source partition in the supplied record order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r727228287



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.

Review comment:
       Since I'm asking about another change, I think there is a copy-paste grammatical error in this sentence we could fix.
   ```suggestion
    * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
    * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
    * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -378,7 +370,7 @@ private boolean sendRecords() {
                             log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                             producerSendException.compareAndSet(null, e);

Review comment:
       We're not calling `submittedRecords.removeLastOccurrence(submittedRecord)` here. Were you thinking that we're setting the `producerSendException`, which will cause the `execute()` method to throw this same exception on the next pass and consequently fail the task? 
   
   I think that's the right choice and no changes are required, but I do need to work through it. So pardon my thought process here.
   
   The question is: what happens to records (and `SubmittedRecord` objects and their offsets) that appear after the record that resulted in the asynchronous exception?
   
   What happens depends on what the producer behavior is, or might be in the future. IIRC the exceptions will often be unrecoverable, but it is possible that records could be sent successfully even if they were submitted to the producer _after_ the record that failed, especially when those records were sent to a different topic partition and were actually sent by the producer _before_ the record that failed. After all, from the [producer.send() JavaDoc](https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L839):
   > Callbacks for records being sent to the same partition are guaranteed to execute in order.
   
   Unfortunately, we cannot infer a relationship between the topic partition for a record and its source partition. So any subsequent records that were sent to a different topic partition could still have the same source partition, and thus they should be enqueued into the same deque. Those offsets would not be committed, since their `SubmittedRecord` instances are after the `SubmittedRecord` for the record that failed to send, and the latter would never be acked (as its send failed).
   
   But if any subsequent records were sent to a different topic partition but had a _different_ source partition, their `SubmittedRecord` instances would be in a different deque than the `SubmittedRecord` for the record that failed to send, and their offsets _could_ potentially be committed.
   
   If the committed offsets were moved as suggested in a separate thread above, we'd actually get a chance to commit offsets for acked source records before failing the task. It's not super essential, but it'd be good to commit the offsets for as many of those submitted-and-acked records as possible.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -250,6 +248,9 @@ public void execute() {
                         recordPollReturned(toSend.size(), time.milliseconds() - start);
                     }
                 }
+
+                updateCommittableOffsets();
+

Review comment:
       Sorry, maybe I wasn't clear in my [previous comment about this call[(https://github.com/apache/kafka/pull/11323#discussion_r724437761). I think there is an edge case here that we could deal with a bit better. Consider the following scenario as we walk through the loop in `execute()`. Th eWorkerSourceTask is not paused, and has been sending and committing offsets for records.
   
   On some pass through the `execute()` while loop:
   1. `shouldPause()` returns false
   2. `maybeThrowProducerSendException()` does nothing since no exception was set from the producer
   3. `poll()` is called to get new records from the source task;
   4. `updateCommittableOffsets()` is called to update the `committableOffsets` map for any records sent in previous loops that have been acked
   5. `sendRecords()` is called with the records retrieved in step 3 earlier in this same pass, which for each of these new records enqueues a `SubmittedRecord` and calls `producer.send(...)` on each record with a callback that acks the submitted record.
   
   But just after step 1 in the aforementioned pass, the connector and its tasks are paused. This means that the next pass through the `WorkerSourceTask.execute()` while loop:
   1. `shouldPause()` returns true, so
   2. `onPause()` is called and `awaitUnpause()` is called.
   
   At that point, the thread blocks. But the records that were send to the producer in step 5 of the previous pass may have already been acked, meaning we _could_ have update the offsets just before we paused. That might not have been enough time for all of the records submitted in that step to be acked, but if we were to move the `updateCommittableOffsets()` to just before the `if (shouldPause())` check then we will get the offsets for as many acked records as possible just before the thread will pause.
   
   In all other non-paused scenarios, I'm not sure it matters where in this loop we call `updateCommittableOffsets()`. But for the just-been-paused scenario, moving it to the first (or last) operation in the loop gives us a bit more of a chance to commit the offsets for as many acked records as possible.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r726483477



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly the current state. This
-            // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
-            // buffer; and the current set of user-specified offsets, stored in the
-            // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish sending
-            log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();
+        }
 
-            if (!flushStarted) {
-                // There was nothing in the offsets to process, but we still waited for the data in the
-                // buffer to flush. This is useful since this can feed into metrics to monitor, e.g.
-                // flush time, which can be used for monitoring even if the connector doesn't record any
-                // offsets.
-                finishSuccessfulFlush();
-                long durationMillis = time.milliseconds() - started;
-                recordCommitSuccess(durationMillis);
-                log.debug("{} Finished offset commitOffsets successfully in {} ms",
-                        this, durationMillis);
-
-                commitSourceTask();
-                return true;
-            }
+        offsetsToCommit.forEach(offsetWriter::offset);
+        if (!offsetWriter.beginFlush()) {
+            // There was nothing in the offsets to process, but we still waited for the data in the
+            // buffer to flush. This is useful since this can feed into metrics to monitor, e.g.

Review comment:
       This is an [existing comment](https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L535-L538) that was just moved around in this PR. I can update it to remove language that refers to flushing a buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720419911



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully

Review comment:
       Sure, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720395810



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -570,22 +512,21 @@ public boolean commitOffsets() {
             // could look a little confusing.
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();

Review comment:
       I think a retry notice is fine. The term "new offsets" may be a little misleading since the offsets that we failed to commit may still be partially or entirely present the next time we try to commit (they're tracked by the offset writer class and retained when `cancelFlush` is invoked).
   
   WDYT about "latest" instead of "new"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r737632973



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       Thank you both for your thoughts.
   
   I think there are a few things at play here that are worth discussing:
   
   1. Whether this change should be backported.
   1. Whether preserving existing behavior should be prioritized.
   1. What the potential benefits of the current multi-deque approach are.
   1. What the conditions for re-evaluating a multi-deque approach are.
   
   ### Backporting
   Even before the single- vs. multi-deque comments were made, I wasn't sure that this change would be suitable for a backport. It's fairly aggressive and comes with moderate risk, and blurs the line between a bug fix and an improvement. Is there a case where this change would "fix" an otherwise irrevocably-broken connector? If a task's producer is overwhelmed by the throughput of messages provided from the task's `poll` method, for example, this change will not fix the underlying issue; it will only reduce the number of duplicates produced by the task if/when it is restarted after being reconfigured to better handle the throughput of data it is processing.
   
   ### Existing behavior
   The Connect API and documentation makes no guarantee about blocking offset commits for source partitions. It makes no guarantee about the order in which [`SourceTask::commitRecord`](https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata)) is invoked (and, in fact, the current behavior for that method more closely mirrors the equivalent source offset commit behavior provided by a multi-deque approach: commits of source records targeting unrelated partitions do not block each other).
   
   Additionally, I cannot think of a development style, external system, or other kind of use case that would rely on the order of source offset commits (across different source partitions) matching the order of the source partition/offset pairs as they were provided from the task in `SourceTask::poll`.
   
   Essentially, this behavior change violates no contract of the Connect API and, at the moment, there is no known or even reasonable case of a connector that would be affected by it. If one can be provided (even using the most abstract kind of external system or developer mindset), then I do agree that it's worth it to preserve existing behavior, but at the moment it seems the risk here is being overstated.
   
   One other consideration that should be made is that the existing metrics on offset commits, which currently serve as an indicator of a task's producer's ability to keep up with the throughput of records produced by the task, will no longer be useful on that front. If a batch could not be flushed in time for an offset commit, the offset commit would be marked failed, and alerts for those offset commits could be triggered to indicate that the task is unhealthy despite not being in the `FAILED` status. This is a much more realistic scenario (that I personally have witnessed in production); should we respect this use case as well and simply abandon the current effort to optimize offset commits altogether, or at least choose not to backport it for this reason?
   
   ### Benefits of multiple deques
   If a subset of the topic partitions that a task is writing to is unavailable for a period of time, then writes to those topic partitions will start failing (and being infinitely retried, if the [default Connect producer properties](https://github.com/apache/kafka/blob/195ebff25b637bf09ce3283e204cfb1faffe34fc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L649-L654) are used). Writes to other topic partition will still succeed, and (assuming the task does not perform some fairly-sophisticated filtering of how it polls from the source system based on invocations of `SourceTask::commitRecord`) the task will continue producing data destined for all topic partitions with no knowledge that writes to that affected subset are not yet succeeding.
   
   If the task is gracefully restarted (i.e., has time to shut down, release resources, and perform an end-of-life offset commit) during this period, the differences between the single-deque and multi-deque approach can be potentially enormous, depending on how long the topic partitions were unavailable for and whether there is a correlation between the source partitions for the records the task produces and the physical Kafka topic partitions that those records are sent to. One known case where there is a direct correlation is MirrorMaker 2.
   
   A slightly weaker, but still valid case, is that if the task's producer has issues producing to a subset of topic partitions, the same potential difference arises. Of course in this case it's best to reconfigure the task (or its producer), or perhaps even open a PR to fix a bug or make an improvement to the producer logic. But in the meantime, we should aim to make Kafka Connect as resilient as possible in the face of these kinds of degradation or even failure scenarios.
   
   ### Conditions for re-evaluation
   There are already known cases where the multi-deque approach would be beneficial. Given this, the only serious reason to refrain from implementing it is to preserve behavior for connectors that, I believe, likely do not exist. However, it's impossible to prove that particular negative, so it's worth considering: what would have to change between now and some point in the future for the value that's being placed on these hypothetical connectors to diminish?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r741279732



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       👍  &nbsp; SGTM. I've updated the PR accordingly.
   
   One nit: the "flushing <n> outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch and not the number of acknowledged messages for which offsets will be committed; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well.
   
   I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante edited a comment on pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante edited a comment on pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#issuecomment-932434127


   Thanks @rhauch, I've addressed all of the comments that seemed straightforward and left a response on the ones where a bit of discussion seems warranted before making changes. This is ready for another round.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720433703



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.

Review comment:
       Sure, will take a stab. LMK what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r728049488



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -250,6 +248,9 @@ public void execute() {
                         recordPollReturned(toSend.size(), time.milliseconds() - start);
                     }
                 }
+
+                updateCommittableOffsets();
+

Review comment:
       Ugh, sorry. Your initial point was very clear, although I really appreciate the detailed writeup here. It was an implementation snafu. I wanted to handle the case where `poll` produced no records, which meant invoking `updateCommittableOffsets` before the `if (toSend == null) continue;` section. Of course, that didn't actually address the original concern, which is that we may miss a chance to update offsets for records just-dispatched to the producer in `sendRecords`.
   
   I like the idea of placing `updateCommittableOffsets` right before the `if (shouldPause())` check, at the top of the loop; will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#issuecomment-940353269


   Thanks @rhauch. Ready for another round when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r717740096



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer

Review comment:
       Nit on indentation:
   ```suggestion
        * @param record the record about to be dispatched; may not be null but may have a null
        *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
        * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
        *         the producer, or {@link #remove removed} if synchronously rejected by the producer
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.

Review comment:
       Should this mention that it's only possible to remove the `SubmittedRecord` most recently submitted via `submit(...)`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.

Review comment:
       WDYT about putting more design context here, such as this class tracking the source offsets for each of the source partitions _in the same order in which the records were returned by the source task's `poll()` method_. And that this class returns the committable offset for any source partition as the offset from the latest submitted record with that source partition that was acknowledged by the producer.
   
   I think it's important that we define the semantics clearly.
   
   Also:
   ```suggestion
    * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
    * source offsets.
    * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
    * {@link SubmittedRecord#ack() acknowledged} from a different thread.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);
+        return result;
+    }
+
+    // Note that this will return null if either there are no committable offsets for the given deque, or the latest
+    // committable offset is itself null. The caller is responsible for distinguishing between the two cases.
+    private Map<String, Object> committableOffset(Deque<SubmittedRecord> queuedRecords) {
+        Map<String, Object> result = null;
+        while (canCommitHead(queuedRecords)) {
+            result = queuedRecords.poll().offset();
+        }
+        return result;
+    }
+
+    private boolean canCommitHead(Deque<SubmittedRecord> queuedRecords) {
+        return queuedRecords.peek() != null && queuedRecords.peek().acked();
+    }
+
+    static class SubmittedRecord {
+        private final Map<String, Object> partition;
+        private final Map<String, Object> offset;
+        private volatile boolean acked;
+
+        public SubmittedRecord(Map<String, Object> partition, Map<String, Object> offset) {
+            this.partition = partition;
+            this.offset = offset;
+            this.acked = false;
+        }
+
+        /**
+         * Acknowledge this record; signals that its offset may be safely committed.

Review comment:
       Maybe add:
   ```suggestion
            * Acknowledge this record; signals that its offset may be safely committed.
            * This is safe to be called from a different thread than what called {@link SubmittedRecords#submit(SourceRecord)}.
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "pcj");
+    private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset);
+        // Record has been submitted but not yet acked; cannot commit offsets for it yet
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Record has been acked; can commit offsets for it
+        assertEquals(Collections.singletonMap(PARTITION1, offset), submittedRecords.committableOffsets());
+
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition1Record2.ack();
+        // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition2Record1.ack();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), submittedRecords.committableOffsets());
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, submittedRecords.committableOffsets());
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemove() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset());
+        submittedRecords.remove(submittedRecord);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }

Review comment:
       This is the only test method for `remove(...)`, but as mentioned above the `remove(...)` method does nothing when it is called with any previously-submitted record other than the most recently submitted record. It'd be good to add another test the case of trying to remove a previously-submitted record that wasn't the most recently submitted. And to verify that, it might be useful for `SubmittedRecords.remove(...)` to return a boolean as to whether it was successfully removed.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -570,22 +512,21 @@ public boolean commitOffsets() {
             // could look a little confusing.
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();

Review comment:
       I wonder if it would be worth improving this log message slightly, to something like:
   > Timed out waiting to flush offsets to storage; will try again on next flush interval with new offsets
   
   Strictly speaking, it's unrelated to the changes made in this PR. But for users seeing this in the log it would be helpful to know that despite it being an error that should be looked into, the next flush interval will attempt to commit all (potentially-updated) offsets.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "pcj");

Review comment:
       Maybe we could use a different value here.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -94,14 +95,9 @@
     private final TopicCreation topicCreation;
 
     private List<SourceRecord> toSend;
-    private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
-    // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
-    // there is no IdentityHashSet.
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
-    // A second buffer is used while an offset flush is running
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
-    private boolean flushing;
-    private CountDownLatch stopRequestedLatch;
+    private volatile Map<Map<String, Object>, Map<String, Object>> offsets;

Review comment:
       WDYT about renaming this as `committableOffsets` to be more clear where this is (and should be) used?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);

Review comment:
       WDYT about adding a unit test that verifies that deques are indeed removed when they are empty? Might require adding some protected method or two in this class, but I think it'd be worth it.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       Per our original discussion, we agreed that it'd be better if the `WorkerSourceTask` thread iterates over the submitted records to find those that have been acked and accumulate the acked source partition and source offsets. The benefit is that the offset thread that periodically commits offsets for all source tasks only has to grab the committed offsets and then commit them.
   
   This is a nice place to do this work. 💯 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.

Review comment:
       The first sentence of the JavaDoc is a bit misleading, since this method doesn't remove _all_ acknowledged records:
   ```suggestion
        * Clear out any acknowledged records at the head of the deques and return the latest offset for each source partition that can be committed.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully

Review comment:
       This is essentially unbounded, meaning we could exhaust memory if the source task keeps producing records to a Kafka partition that remains offline for an extended period of time. That is already the case with the prior behavior, so this is no worse.
   
   Might be worth mentioning in the PR description.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.

Review comment:
       Maybe add:
   ```suggestion
        * Enqueue a new source record before dispatching it to a producer.
        * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
        * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully
        * sent to the producer.
        * 
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -291,6 +286,13 @@ private void maybeThrowProducerSendException() {
         }
     }
 
+    private void updateCommittableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> newOffsets = submittedRecords.committableOffsets();
+        synchronized (this) {
+            offsets.putAll(newOffsets);
+        }

Review comment:
       Can we avoid this synchronized block if the `newOffsets` is empty?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r726480832



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       Not intentional, can change to compute new offsets after (possibly) invoking `poll()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r727276176



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       See [this comment in a subsequent review](https://github.com/apache/kafka/pull/11323#discussion_r727223026) for a followup. Unresolving for better visibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720382154



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.

Review comment:
       That's not actually the case; the use of [`Deque::removeLastOccurrence`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#removeLastOccurrence-java.lang.Object-) is different from [`Deque::removeLast`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#removeLast--) and [`Deque::pollLast`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#pollLast--) in that it can and will remove an element from anywhere in the deque (as opposed to just the element at the very end) if it `equals` the one passed to `removeLastOccurrence`. The reason `removeLastOccurrence` is used here instead of the more-common [`remove`](https://docs.oracle.com/javase/8/docs/api/java/util/Deque.html#remove-java.lang.Object-) is that, at least with the current use of `SubmittedRecords` by `WorkerSourceTask`, any calls to `SubmittedRecords::remove` will pass in the most-recently-submitted record, which will then be at the deque it was inserted into, so there's a (potential
 ly-substantial) performance gain.
   
   I wonder if it might be helpful to alter this method:
   - By renaming to `removeLastOccurrence`
   - By clarifying in the Javadoc that, if multiple instances of the same `SubmittedRecord` have been submitted already, only the first one found (traversing from the end of the deque backward) will be removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r717740096



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer

Review comment:
       Nit on indentation:
   ```suggestion
        * @param record the record about to be dispatched; may not be null but may have a null
        *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
        * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
        *         the producer, or {@link #remove removed} if synchronously rejected by the producer
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.

Review comment:
       Should this mention that it's only possible to remove the `SubmittedRecord` most recently submitted via `submit(...)`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.

Review comment:
       WDYT about putting more design context here, such as this class tracking the source offsets for each of the source partitions _in the same order in which the records were returned by the source task's `poll()` method_. And that this class returns the committable offset for any source partition as the offset from the latest submitted record with that source partition that was acknowledged by the producer.
   
   I think it's important that we define the semantics clearly.
   
   Also:
   ```suggestion
    * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
    * source offsets.
    * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
    * {@link SubmittedRecord#ack() acknowledged} from a different thread.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);
+        return result;
+    }
+
+    // Note that this will return null if either there are no committable offsets for the given deque, or the latest
+    // committable offset is itself null. The caller is responsible for distinguishing between the two cases.
+    private Map<String, Object> committableOffset(Deque<SubmittedRecord> queuedRecords) {
+        Map<String, Object> result = null;
+        while (canCommitHead(queuedRecords)) {
+            result = queuedRecords.poll().offset();
+        }
+        return result;
+    }
+
+    private boolean canCommitHead(Deque<SubmittedRecord> queuedRecords) {
+        return queuedRecords.peek() != null && queuedRecords.peek().acked();
+    }
+
+    static class SubmittedRecord {
+        private final Map<String, Object> partition;
+        private final Map<String, Object> offset;
+        private volatile boolean acked;
+
+        public SubmittedRecord(Map<String, Object> partition, Map<String, Object> offset) {
+            this.partition = partition;
+            this.offset = offset;
+            this.acked = false;
+        }
+
+        /**
+         * Acknowledge this record; signals that its offset may be safely committed.

Review comment:
       Maybe add:
   ```suggestion
            * Acknowledge this record; signals that its offset may be safely committed.
            * This is safe to be called from a different thread than what called {@link SubmittedRecords#submit(SourceRecord)}.
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "pcj");
+    private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset);
+        // Record has been submitted but not yet acked; cannot commit offsets for it yet
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Record has been acked; can commit offsets for it
+        assertEquals(Collections.singletonMap(PARTITION1, offset), submittedRecords.committableOffsets());
+
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition1Record2.ack();
+        // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition2Record1.ack();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), submittedRecords.committableOffsets());
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, submittedRecords.committableOffsets());
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemove() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset());
+        submittedRecords.remove(submittedRecord);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }

Review comment:
       This is the only test method for `remove(...)`, but as mentioned above the `remove(...)` method does nothing when it is called with any previously-submitted record other than the most recently submitted record. It'd be good to add another test the case of trying to remove a previously-submitted record that wasn't the most recently submitted. And to verify that, it might be useful for `SubmittedRecords.remove(...)` to return a boolean as to whether it was successfully removed.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -570,22 +512,21 @@ public boolean commitOffsets() {
             // could look a little confusing.
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();

Review comment:
       I wonder if it would be worth improving this log message slightly, to something like:
   > Timed out waiting to flush offsets to storage; will try again on next flush interval with new offsets
   
   Strictly speaking, it's unrelated to the changes made in this PR. But for users seeing this in the log it would be helpful to know that despite it being an error that should be looked into, the next flush interval will attempt to commit all (potentially-updated) offsets.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "pcj");

Review comment:
       Maybe we could use a different value here.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -94,14 +95,9 @@
     private final TopicCreation topicCreation;
 
     private List<SourceRecord> toSend;
-    private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
-    // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
-    // there is no IdentityHashSet.
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
-    // A second buffer is used while an offset flush is running
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
-    private boolean flushing;
-    private CountDownLatch stopRequestedLatch;
+    private volatile Map<Map<String, Object>, Map<String, Object>> offsets;

Review comment:
       WDYT about renaming this as `committableOffsets` to be more clear where this is (and should be) used?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);

Review comment:
       WDYT about adding a unit test that verifies that deques are indeed removed when they are empty? Might require adding some protected method or two in this class, but I think it'd be worth it.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       Per our original discussion, we agreed that it'd be better if the `WorkerSourceTask` thread iterates over the submitted records to find those that have been acked and accumulate the acked source partition and source offsets. The benefit is that the offset thread that periodically commits offsets for all source tasks only has to grab the committed offsets and then commit them.
   
   This is a nice place to do this work. 💯 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.

Review comment:
       The first sentence of the JavaDoc is a bit misleading, since this method doesn't remove _all_ acknowledged records:
   ```suggestion
        * Clear out any acknowledged records at the head of the deques and return the latest offset for each source partition that can be committed.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        records.get(record.partition())
+                .removeLastOccurrence(record);
+    }
+
+    /**
+     * Clear out any acknowledged records and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully

Review comment:
       This is essentially unbounded, meaning we could exhaust memory if the source task keeps producing records to a Kafka partition that remains offline for an extended period of time. That is already the case with the prior behavior, so this is no worse.
   
   Might be worth mentioning in the PR description.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.

Review comment:
       Maybe add:
   ```suggestion
        * Enqueue a new source record before dispatching it to a producer.
        * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
        * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully
        * sent to the producer.
        * 
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -291,6 +286,13 @@ private void maybeThrowProducerSendException() {
         }
     }
 
+    private void updateCommittableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> newOffsets = submittedRecords.committableOffsets();
+        synchronized (this) {
+            offsets.putAll(newOffsets);
+        }

Review comment:
       Can we avoid this synchronized block if the `newOffsets` is empty?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r732984001



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       The choice of having a map of deques here has some interesting implications. 
   
   For cases with multiple partitions, a failure to deliver a produced record within a batch would not stop committing offsets associated with subsequent records (that have been produced and ack'ed successfully) as long as the partition of these records is different than the record that hasn't been ack'ed and eventually will fail to be produced. And that's a valid choice, however the disadvantage I see here is mainly that this new logic doesn't map always to the existing behavior. The existing code, that is based on two queues of outstanding messages, once a record is not ack'ed does not commit offsets of subsequent successful records. 
   
   To match the existing behavior with this new and improved non-blocking implementation seems to me that we'd only need to use a common global `Deque` instead of this map and stop removing elements once the first non-ack'ed and non-failed with a retriable exception submitted record was encountered (like you do for a single entry and a single deque now). 
   
   To summarize a comparison of the two options: 
   * with global deque we preserve the current semantics and behavior, we make fewer allocations of collections and maps, the code is a bit simpler. 
   * with map of deques we avoid few duplicates by committing offsets of successfully produced records within a batch
   
   With the above in mind, I wonder if using a single deque is preferable here, most importantly to preserve the existing behavior while getting the benefit of unblocking the offset commit in the presence of failures. Wdyt @C0urante @rhauch ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720389261



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -291,6 +286,13 @@ private void maybeThrowProducerSendException() {
         }
     }
 
+    private void updateCommittableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> newOffsets = submittedRecords.committableOffsets();
+        synchronized (this) {
+            offsets.putAll(newOffsets);
+        }

Review comment:
       Yep, can do. I initially held off because there's only two synchronized blocks in the entire class now and they should both take very little time, but after checking out the parent `WorkerTask` it looks like we're doing some calls to the status listener in synchronized blocks as well. Seems worth the extra few lines here to avoid unnecessary synchronization. 👍 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -94,14 +95,9 @@
     private final TopicCreation topicCreation;
 
     private List<SourceRecord> toSend;
-    private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
-    // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
-    // there is no IdentityHashSet.
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
-    // A second buffer is used while an offset flush is running
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
-    private boolean flushing;
-    private CountDownLatch stopRequestedLatch;
+    private volatile Map<Map<String, Object>, Map<String, Object>> offsets;

Review comment:
       Sure, fine by me 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r727228287



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.

Review comment:
       Since I'm asking about another change, I think there is a copy-paste grammatical error in this sentence we could fix.
   ```suggestion
    * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
    * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
    * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -378,7 +370,7 @@ private boolean sendRecords() {
                             log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                             producerSendException.compareAndSet(null, e);

Review comment:
       We're not calling `submittedRecords.removeLastOccurrence(submittedRecord)` here. Were you thinking that we're setting the `producerSendException`, which will cause the `execute()` method to throw this same exception on the next pass and consequently fail the task? 
   
   I think that's the right choice and no changes are required, but I do need to work through it. So pardon my thought process here.
   
   The question is: what happens to records (and `SubmittedRecord` objects and their offsets) that appear after the record that resulted in the asynchronous exception?
   
   What happens depends on what the producer behavior is, or might be in the future. IIRC the exceptions will often be unrecoverable, but it is possible that records could be sent successfully even if they were submitted to the producer _after_ the record that failed, especially when those records were sent to a different topic partition and were actually sent by the producer _before_ the record that failed. After all, from the [producer.send() JavaDoc](https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L839):
   > Callbacks for records being sent to the same partition are guaranteed to execute in order.
   
   Unfortunately, we cannot infer a relationship between the topic partition for a record and its source partition. So any subsequent records that were sent to a different topic partition could still have the same source partition, and thus they should be enqueued into the same deque. Those offsets would not be committed, since their `SubmittedRecord` instances are after the `SubmittedRecord` for the record that failed to send, and the latter would never be acked (as its send failed).
   
   But if any subsequent records were sent to a different topic partition but had a _different_ source partition, their `SubmittedRecord` instances would be in a different deque than the `SubmittedRecord` for the record that failed to send, and their offsets _could_ potentially be committed.
   
   If the committed offsets were moved as suggested in a separate thread above, we'd actually get a chance to commit offsets for acked source records before failing the task. It's not super essential, but it'd be good to commit the offsets for as many of those submitted-and-acked records as possible.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -250,6 +248,9 @@ public void execute() {
                         recordPollReturned(toSend.size(), time.milliseconds() - start);
                     }
                 }
+
+                updateCommittableOffsets();
+

Review comment:
       Sorry, maybe I wasn't clear in my [previous comment about this call[(https://github.com/apache/kafka/pull/11323#discussion_r724437761). I think there is an edge case here that we could deal with a bit better. Consider the following scenario as we walk through the loop in `execute()`. Th eWorkerSourceTask is not paused, and has been sending and committing offsets for records.
   
   On some pass through the `execute()` while loop:
   1. `shouldPause()` returns false
   2. `maybeThrowProducerSendException()` does nothing since no exception was set from the producer
   3. `poll()` is called to get new records from the source task;
   4. `updateCommittableOffsets()` is called to update the `committableOffsets` map for any records sent in previous loops that have been acked
   5. `sendRecords()` is called with the records retrieved in step 3 earlier in this same pass, which for each of these new records enqueues a `SubmittedRecord` and calls `producer.send(...)` on each record with a callback that acks the submitted record.
   
   But just after step 1 in the aforementioned pass, the connector and its tasks are paused. This means that the next pass through the `WorkerSourceTask.execute()` while loop:
   1. `shouldPause()` returns true, so
   2. `onPause()` is called and `awaitUnpause()` is called.
   
   At that point, the thread blocks. But the records that were send to the producer in step 5 of the previous pass may have already been acked, meaning we _could_ have update the offsets just before we paused. That might not have been enough time for all of the records submitted in that step to be acked, but if we were to move the `updateCommittableOffsets()` to just before the `if (shouldPause())` check then we will get the offsets for as many acked records as possible just before the thread will pause.
   
   In all other non-paused scenarios, I'm not sure it matters where in this loop we call `updateCommittableOffsets()`. But for the just-been-paused scenario, moving it to the first (or last) operation in the loop gives us a bit more of a chance to commit the offsets for as many acked records as possible.
   
   WDYT?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       See [this comment in a subsequent review](https://github.com/apache/kafka/pull/11323#discussion_r727223026) for a followup.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       See [this comment in a subsequent review](https://github.com/apache/kafka/pull/11323#discussion_r727223026) for a followup. Unresolving for better visibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r738528766



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #removeLastOccurrence(SubmittedRecord) removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #removeLastOccurrence removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * If multiple instances of the same {@link SubmittedRecord} have been submitted already, only the first one found
+     * (traversing from the end of the deque backward) will be removed.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     * @return whether an instance of the record was removed
+     */
+    public boolean removeLastOccurrence(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition());
+            return false;
+        }
+        boolean result = deque.removeLastOccurrence(record);
+        if (deque.isEmpty()) {
+            records.remove(record.partition());
+        }
+        if (!result) {
+            log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition());
+        }
+        return result;
+    }
+
+    /**
+     * Clear out any acknowledged records at the head of the deques and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);
+        return result;
+    }

Review comment:
       @C0urante, right now we have no visibility into the number or size of deques. We can't add a metric without a KIP, but WDYT about adding some DEBUG and/or TRACE log messages here? The benefit of here rather than in the WorderSourceTask is that it would be much easier to enable DEBUG or TRACE for only these log messages. One disadvantage is that this `committableOffsets()` method is calls once per iteration in the `WorkerSourceTask.execute()` method.
   
   I guess an alternative might be to add a method (e.g., `toString()`?) that output this information, and then put the log messages in `WorkerSinkTask.commitOffsets()`.
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r740478689



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       As you point out, the old log message was:
   ```
    log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
   ```
   This log message had two things it'd be nice to keep:
   1. `this` as the context; and
   2. the number of records whose offsets were being committed (e.g., the number of acked records).
   
   I think both would be good to include, especially if we're saying the number of records whose offsets are _not_ being committed (yet).
   
   The `Pending` class seems pretty useful, but computing the number of acked records is not possible here. WDYT about merging the `SumittedRecords.committableOffsets()` and `pending()` methods, by having the former return an object that contains the offset map _and_ the metadata that can be used for logging? This class would be like `Pending`, though maybe `CommittableOffsets` is a more apt name. Plus, `WorkerSourceTask` would only have one volatile field that is updated atomically.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r741279732



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       👍  &nbsp; SGTM. I've updated the PR accordingly.
   
   One nit: the "flushing <n> outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well.
   
   I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r741409428



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       When I made the suggestion to use a single deque in order to avoid considerably changing the behavior we have today, I had forgotten that we explicitly set the producer retries to be infinite. 
   
   Having said that, my initial comment was not about finding a hypothetical connector that would break by this change in behavior, but more about whether it's an intuitive programming model to assume that while records are produced in sequence by a Connect task, only some of them have their offsets persisted, without bubbling up an error to the task or stalling offset commit overall for that task. 
   
   That might be true for most connectors that use different keys per task and consider each offset key (source partition) completely independent from another offset key. (I can see also how many connectors might use only a single or a few keys per task, since keys are not supposed to scale with the produce messages). But I wouldn't say that the implications of partition unavailability to how tasks commit offsets are that obvious to the connector developer.
   
   My assumption is that Connect sets infinite retries based on the opinion that Kafka is dependable enough that given some time for recovery it will bring all the partitions of a topic to a functioning state and similar delivery throughput. And that's a fair assumption to make. 
   
   If we don't consider backporting this fix, then multiple deques LGTM. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #11323:
URL: https://github.com/apache/kafka/pull/11323


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720416578



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r726485900



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "adifferentvalue");
+    private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEmptyRecords();
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset);
+        // Record has been submitted but not yet acked; cannot commit offsets for it yet
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        submittedRecord.ack();
+        // Record has been acked; can commit offsets for it
+        assertEquals(Collections.singletonMap(PARTITION1, offset), submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record2.ack();
+        // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition2Record1.ack();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveLastSubmittedRecord() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset());
+        submittedRecords.remove(submittedRecord);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveNotLastSubmittedRecord() {
+        Map<String, Object> partition1Offset = newOffset();
+        Map<String, Object> partition2Offset = newOffset();
+
+        SubmittedRecord recordToRemove = submittedRecords.submit(PARTITION1, partition1Offset);
+        SubmittedRecord lastSubmittedRecord = submittedRecords.submit(PARTITION2, partition2Offset);
+
+        assertNoEmptyDeques();
+
+        submittedRecords.remove(recordToRemove);
+
+        assertNoEmptyDeques();
+        // The only record for this partition has been removed; we shouldn't be tracking a deque for it anymore
+        assertRemovedDeques(PARTITION1);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        recordToRemove.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        lastSubmittedRecord.ack();
+        // Now that the last-submitted record has been ack'd, we should be able to commit its offset
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset), submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();

Review comment:
       This is redundant; if `assertEmptyRecords()` passes, then there are no deques left in the `SubmittedRecords` instance to perform any assertions on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r726477234



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link #remove(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record for partition {}, but no records with that partition are present", record.partition());

Review comment:
       Happy to make the change, although I personally doubt that any amount of rewording would make this message meaningful to anyone who isn't actively reviewing the source code. It's such an internal-facing detail; if I were a user, the addition of "from submitted queue" and "appear to have been submitted" wouldn't really change the fact that I would have no idea of how Connect does offset calculation for source tasks or why this might be a problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r728049488



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -250,6 +248,9 @@ public void execute() {
                         recordPollReturned(toSend.size(), time.milliseconds() - start);
                     }
                 }
+
+                updateCommittableOffsets();
+

Review comment:
       Ugh, sorry. Your initial point was very clear, although I really appreciate the detailed writeup here. It was an implementation snafu. I wanted to handle the case where `poll` produced no records, which meant invoking `updateCommittableOffsets` before the `if (toSend == null) continue;` section. Of course, that didn't actually address the original concern, which is that we may miss a chance to update offsets for records just-dispatched to the producer in `sendRecords`.
   
   I like the idea of placing `updateCommittableOffsets` right before the `if (shouldPause())` check, at the top of the loop; will do.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -378,7 +370,7 @@ private boolean sendRecords() {
                             log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                             producerSendException.compareAndSet(null, e);

Review comment:
       > So any subsequent records that were sent to a different topic partition could still have the same source partition, and thus they should be enqueued into the same deque. Those offsets would not be committed, since their SubmittedRecord instances are after the SubmittedRecord for the record that failed to send, and the latter would never be acked (as its send failed).
   
   I think this is the "vital" section and it provides a good rationale for why we intentionally keep the failed record in the queue.
   
   > If the committed offsets were moved as suggested in a separate thread above, we'd actually get a chance to commit offsets for acked source records before failing the task. It's not super essential, but it'd be good to commit the offsets for as many of those submitted-and-acked records as possible.
   
   We call `commitOffsets` in a `finally` block for `execute` right now. I think we can address this case by adding another call to `updateCommittableOffsets` right before this end-of-life call to `commitOffsets`. I've done this; LMKWYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link #remove(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record for partition {}, but no records with that partition are present", record.partition());

Review comment:
       IIUC this is really an unexpected condition, since it's single-threaded and this method is called only in the catch block after sending a record to the producer. But without that context, anyone reading this message in the log file might be confused or even concerned about what "remove record for partition..." means. WDYT about mentioning more of that context, something like:
   ```suggestion
               log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition());
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly the current state. This
-            // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
-            // buffer; and the current set of user-specified offsets, stored in the
-            // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish sending
-            log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();

Review comment:
       In any of the cases below where we call `offsetWriter.cancelFlush()`, it appears that we're relying upon the offset writer to keep the offsets that it was unable to flush -- we're always computing new offsets at this point. 
   
   WDYT about adding a comment above line 474, something along the lines of:
   ```
    // Update the offset writer with any new offsets for records that have been acked.
    // The offset writer will continue to track all offsets until they are able to be successfully flushed.
    // IOW, if the offset writer fails to flush, it keeps those offset for the next attempt,
    // though we may update them here with newer offsets for acked records.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -570,22 +512,21 @@ public boolean commitOffsets() {
             // could look a little confusing.
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();

Review comment:
       So
   > Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets
   
   Sounds good. 
   
   BTW, the fact that the offset writer continues to track the offsets after failed flush attempts is a subtle thing, and it's worth calling out above (see my comment on lines 476-477) to help explain why we can always replace the `offsetsToCommit` map even after the offset writer failed to flush.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "adifferentvalue");
+    private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEmptyRecords();
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset);
+        // Record has been submitted but not yet acked; cannot commit offsets for it yet
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        submittedRecord.ack();
+        // Record has been acked; can commit offsets for it
+        assertEquals(Collections.singletonMap(PARTITION1, offset), submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record2.ack();
+        // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition2Record1.ack();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveLastSubmittedRecord() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset());
+        submittedRecords.remove(submittedRecord);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveNotLastSubmittedRecord() {
+        Map<String, Object> partition1Offset = newOffset();
+        Map<String, Object> partition2Offset = newOffset();
+
+        SubmittedRecord recordToRemove = submittedRecords.submit(PARTITION1, partition1Offset);
+        SubmittedRecord lastSubmittedRecord = submittedRecords.submit(PARTITION2, partition2Offset);
+
+        assertNoEmptyDeques();
+
+        submittedRecords.remove(recordToRemove);
+
+        assertNoEmptyDeques();
+        // The only record for this partition has been removed; we shouldn't be tracking a deque for it anymore
+        assertRemovedDeques(PARTITION1);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        recordToRemove.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        lastSubmittedRecord.ack();
+        // Now that the last-submitted record has been ack'd, we should be able to commit its offset
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset), submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks
+        assertEmptyRecords();

Review comment:
       Also?
   ```suggestion
           assertEmptyRecords();
           assertNoEmptyDeques();
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       Actually, I now have a question: why did you choose to add it _before_ the `poll()` (a few lines down) rather than after, perhaps after the `if (!sendRecords()l) {...}` block below?
   
   The reason I ask is that if one loop of the while polls for records and sends them (where they are sent to the producer and asynchronously acked), but then the connector is paused about the same time, then the offsets for those records will not be committed until after the connector is resumed above. Is that intentional?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.

Review comment:
       > * By clarifying in the Javadoc that, if multiple instances of the same SubmittedRecord have been submitted already, only the first one found (traversing from the end of the deque backward) will be removed
   
   I think this would be great.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly the current state. This
-            // means both the current set of messages we're still waiting to finish, stored in this
-            // class, which setting flushing = true will handle by storing any new values into a new
-            // buffer; and the current set of user-specified offsets, stored in the
-            // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish sending
-            log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();
+        }
 
-            if (!flushStarted) {
-                // There was nothing in the offsets to process, but we still waited for the data in the
-                // buffer to flush. This is useful since this can feed into metrics to monitor, e.g.
-                // flush time, which can be used for monitoring even if the connector doesn't record any
-                // offsets.
-                finishSuccessfulFlush();
-                long durationMillis = time.milliseconds() - started;
-                recordCommitSuccess(durationMillis);
-                log.debug("{} Finished offset commitOffsets successfully in {} ms",
-                        this, durationMillis);
-
-                commitSourceTask();
-                return true;
-            }
+        offsetsToCommit.forEach(offsetWriter::offset);
+        if (!offsetWriter.beginFlush()) {
+            // There was nothing in the offsets to process, but we still waited for the data in the
+            // buffer to flush. This is useful since this can feed into metrics to monitor, e.g.

Review comment:
       What do you mean by "we still waited for the data in the buffer to flush"? The `beginFlush()` method doesn't actually do any flushing; it merely performs the snapshot of the offset writer's data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#issuecomment-918482406


   CC @rhauch; hopefully this is fairly close to what you had in mind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r733010990



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       Great points, @kkonstantine. You're right that using multiple deques does not quite mirror the existing behavior, whereas using a single deque does maintain the same behavior of committing offsets in the same order as they were returned by the task, albeit much more efficiently than the older behavior.
   
   I suspect that most connectors generate messages with proper source partition and source offset maps. However, if any don't then using multiple deques could result in different behavior. IOW, I think the risk is low, but non-zero. Using a single deque avoids that potential issue altogether.
   
   Given that this is a bug fix, I agree it's probably better to maintain as much behavior as possible while only fixing the undesirable behavior (in this case, blocking the offset commit thread). @C0urante did such a great job encapsulating the behavior that only the `SubmittedRecords` class would need to be changed/simplified.
   
   We can always reevaluate in the future whether to more opportunistically commit offsets for _different_ source partitions independent of the supplied record order (the order they were returned from the task), while committing offsets with the _same_ source partition in the supplied record order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r740478689



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       As you point out, the old log message was:
   ```
    log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
   ```
   This log message had two things it'd be nice to keep:
   1. `this` as the context; and
   2. the number of records whose offsets were being committed (e.g., the number of acked records).
   
   I think both would be good to include, especially if we're saying the number of records whose offsets are _not_ being committed (yet).
   
   The `Pending` class seems pretty useful, but computing the number of acked records is not possible here. WDYT about merging the `SumittedRecords.committableOffsets()` and `pending()` methods, by having the former return an object that contains the offset map _and_ the metadata that can be used for logging? This class would be like `Pending`, though maybe `CommittableOffsets` is a more apt name. Plus, `WorkerSourceTask` would only have one volatile field that is updated atomically.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r740365960



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #removeLastOccurrence(SubmittedRecord) removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #removeLastOccurrence removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * If multiple instances of the same {@link SubmittedRecord} have been submitted already, only the first one found
+     * (traversing from the end of the deque backward) will be removed.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     * @return whether an instance of the record was removed
+     */
+    public boolean removeLastOccurrence(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition());
+            return false;
+        }
+        boolean result = deque.removeLastOccurrence(record);
+        if (deque.isEmpty()) {
+            records.remove(record.partition());
+        }
+        if (!result) {
+            log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition());
+        }
+        return result;
+    }
+
+    /**
+     * Clear out any acknowledged records at the head of the deques and return the latest offset for each source partition that can be committed.
+     * Note that this may take some time to complete if a large number of records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent successfully
+     * @return the latest-possible offsets to commit for each source partition; may be empty but never null
+     */
+    public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
+        records.forEach((partition, queuedRecords) -> {
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                result.put(partition, offset);
+            }
+        });
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);
+        return result;
+    }

Review comment:
       I agree with your concerns about excess logging if a message is added to the `WorkerSourceTask::execute` loop.
   
   Since we're removing [this log message](https://github.com/apache/kafka/blob/af8100b94fda4a27511797233e9845078ae8a69f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L510) in this PR, I wonder if we can replace it with something similar? I think users may want to know how many total pending (i.e., unacked) messages there are, how many deques there are, and the number of messages in the largest deque (which may be useful for identifying "stuck" topic partitions).
   
   I'll take a shot at this; LMKWYT.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       👍  &nbsp; SGTM. I've updated the PR accordingly.
   
   One nit: the "flushing <n> outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well.
   
   I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");

Review comment:
       👍  &nbsp; SGTM. I've updated the PR accordingly.
   
   One nit: the "flushing <n> outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch and not the number of acknowledged messages for which offsets will be committed; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well.
   
   I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r741409428



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       When I made the suggestion to use a single deque in order to avoid considerably changing the behavior we have today, I had forgotten that we explicitly set the producer retries to be infinite. 
   
   Having said that, my initial comment was not about finding a hypothetical connector that would break by this change in behavior, but more about whether it's an intuitive programming model to assume that while records are produced in sequence by a Connect task, only some of them have their offsets persisted, without bubbling up an error to the task or stalling offset commit overall for that task. 
   
   That might be true for most connectors that use different keys per task and consider each offset key (source partition) completely independent from another offset key. (I can see also how many connectors might use only a single or a few keys per task, since keys are not supposed to scale with the produce messages). But I wouldn't say that the implications of partition unavailability to how tasks commit offsets are that obvious to the connector developer.
   
   My assumption is that Connect sets infinite retries based on the opinion that Kafka is dependable enough that given some time for recovery it will bring all the partitions of a topic to a functioning state and similar delivery throughput. And that's a fair assumption to make. 
   
   If we don't consider backporting this fix, then multiple deques LGTM. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r726477234



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link #remove(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #remove removed} if synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record for partition {}, but no records with that partition are present", record.partition());

Review comment:
       Happy to make the change, although I personally doubt that any amount of rewording would make this message meaningful to anyone who isn't actively reviewing or already familiar with the source code. It's such an internal-facing detail; if I were a user, the addition of "from submitted queue" and "appear to have been submitted" wouldn't really change the fact that I would have no idea of how Connect does offset calculation for source tasks or why this might be a problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720384099



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "pcj");
+    private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset);
+        // Record has been submitted but not yet acked; cannot commit offsets for it yet
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Record has been acked; can commit offsets for it
+        assertEquals(Collections.singletonMap(PARTITION1, offset), submittedRecords.committableOffsets());
+
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition1Record2.ack();
+        // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition2Record1.ack();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), submittedRecords.committableOffsets());
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, submittedRecords.committableOffsets());
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemove() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset());
+        submittedRecords.remove(submittedRecord);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets());
+    }

Review comment:
       As discussed above, the behavior of `SubmittedRecords::remove` is slightly different and it will work even if the to-be-removed record is not the last-submitted one for its deque. However...
   
   > It'd be good to add another test the case of trying to remove a previously-submitted record that wasn't the most recently submitted.
   
   Agreed, will add one.
   
   > And to verify that, it might be useful for SubmittedRecords.remove(...) to return a boolean as to whether it was successfully removed.
   
   I'm not sure that this would buy us much more than some other assertions on the offsets returned by `committableOffsets`, but I don't see much of a maintenance burden for expanding the contract of `SubmittedRecords::remove` or adding these assertions, so happy to do this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r727276176



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       See [this comment in a subsequent review](https://github.com/apache/kafka/pull/11323#discussion_r727223026) for a followup.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r737632973



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       Thank you both for your thoughts.
   
   I think there are a few things at play here that are worth discussing:
   
   1. Whether this change should be backported.
   1. Whether preserving existing behavior should be prioritized.
   1. What the potential benefits of the current multi-deque approach are.
   1. What the conditions for re-evaluating a multi-deque approach are.
   
   ### Backporting
   Even before the single- vs. multi-deque comments were made, I wasn't sure that this change would be suitable for a backport. It's fairly aggressive and comes with moderate risk, and blurs the line between a bug fix and an improvement. Is there a case where this change would "fix" an otherwise irrevocably-broken connector? If a task's producer is overwhelmed by the throughput of messages provided from the task's `poll` method, for example, this change will not fix the underlying issue; it will only reduce the number of duplicates produced by the task if/when it is restarted after being reconfigured to better handle the throughput of data it is processing.
   
   ### Existing behavior
   The Connect API and documentation makes no guarantee about blocking offset commits for source partitions. It makes no guarantee about the order in which [`SourceTask::commitRecord`](https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata)) is invoked (and, in fact, the current behavior for that method more closely mirrors the equivalent source offset commit behavior provided by a multi-deque approach: commits of source records targeting unrelated partitions do not block each other).
   
   Additionally, I cannot think of a development style, external system, or other kind of use case that would rely on the order of source offset commits (across different source partitions) matching the order of the source partition/offset pairs as they were provided from the task in `SourceTask::poll`.
   
   Essentially, this behavior change violates no contract of the Connect API and, at the moment, there is no known or even reasonable case of a connector that would be affected by it. If one can be provided (even using the most abstract kind of external system or developer mindset), then I do agree that it's worth it to preserve existing behavior, but at the moment it seems the risk here is being overstated.
   
   ### Benefits of multiple deques
   If a subset of the topic partitions that a task is writing to is unavailable for a period of time, then writes to those topic partitions will start failing (and being infinitely retried, if the [default Connect producer properties](https://github.com/apache/kafka/blob/195ebff25b637bf09ce3283e204cfb1faffe34fc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L649-L654) are used). Writes to other topic partition will still succeed, and (assuming the task does not perform some fairly-sophisticated filtering of how it polls from the source system based on invocations of `SourceTask::commitRecord`) the task will continue producing data destined for all topic partitions with no knowledge that writes to that affected subset are not yet succeeding.
   
   If the task is gracefully restarted (i.e., has time to shut down, release resources, and perform an end-of-life offset commit) during this period, the differences between the single-deque and multi-deque approach can be potentially enormous, depending on how long the topic partitions were unavailable for and whether there is a correlation between the source partitions for the records the task produces and the physical Kafka topic partitions that those records are sent to. One known case where there is a direct correlation is MirrorMaker 2.
   
   A slightly weaker, but still valid case, is that if the task's producer has issues producing to a subset of topic partitions, the same potential difference arises. Of course in this case it's best to reconfigure the task (or its producer), or perhaps even open a PR to fix a bug or make an improvement to the producer logic. But in the meantime, we should aim to make Kafka Connect as resilient as possible in the face of these kinds of degradation or even failure scenarios.
   
   ### Conditions for re-evaluation
   There are already known cases where the multi-deque approach would be beneficial. Given this, the only serious reason to refrain from implementing it is to preserve behavior for connectors that, I believe, likely do not exist. However, it's impossible to prove that particular negative, so it's worth considering: what would have to change between now and some point in the future for the value that's being placed on these hypothetical connectors to diminish?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org