You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/07 09:05:52 UTC

[GitHub] [kafka] tombentley commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

tombentley commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r723982484



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -482,6 +486,22 @@ private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record
         }
     }
 
+    private synchronized void recordSendFailed(ProducerRecord<byte[], byte[]> record) {
+        if (outstandingMessages.containsKey(record)) {
+            currentBatchFailed = true;
+            if (flushing) {
+                // flush thread may be waiting on the outstanding messages to clear
+                this.notifyAll();
+            }
+        } else if (outstandingMessagesBacklog.containsKey(record)) {
+            backlogBatchFailed = true;
+        }
+    }
+
+    public boolean shouldCommitOffsets() {

Review comment:
       Can we add some Javadoc

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
##########
@@ -214,6 +215,12 @@ public void testCancelAfterAwaitFlush() throws Exception {
         PowerMock.verifyAll();
     }
 
+    private void assertBeginFlush(boolean shouldFlush) {
+        Consumer<Boolean> assertion = shouldFlush ? Assert::assertTrue : Assert::assertFalse;
+        assertion.accept(writer.willFlush());
+        assertion.accept(writer.beginFlush());

Review comment:
       Wouldn't `assertEquals(shouldFlush, writer.willFlush())` and `assertEquals(shouldFlush, writer.beginFlush())` be clearer?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##########
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
     }
 
     private void commit(WorkerSourceTask workerTask) {
+        if (!workerTask.shouldCommitOffsets()) {
+            log.trace("{} Skipping offset commit as there is nothing to be committed", workerTask);

Review comment:
       The method used in the `if` is called `shouldCommitOffsets`, not `areThereOffsetsToBeCommitted`, so is this message accurate, or perhaps the method name is slightly misleading? Maybe amending the message to "...there are no offsets that should be committed"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org