You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/17 17:32:34 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12412: KAFKA-14079 - Clear out SubmittedRecords in WorkerSourceTask on failed producer errors

C0urante commented on code in PR #12412:
URL: https://github.com/apache/kafka/pull/12412#discussion_r922872804


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -835,24 +836,41 @@ public void testSourceTaskIgnoresProducerException() throws Exception {
         createWorkerTaskWithErrorToleration();
         expectTopicCreation(TOPIC);
 
-        // send two records
+        // send three records
         // record 1 will succeed
         // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
         // and no ConnectException will be thrown
+        // record 3 will succeed
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
         expectSendRecordOnce();
         expectSendRecordProducerCallbackFail();
-        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull());
+        expectSendRecordOnce();
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject());
         EasyMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
         Whitebox.invokeMethod(workerTask, "sendRecords");
 
+        final SubmittedRecords submittedRecords = Whitebox.getInternalState(workerTask, "submittedRecords");
+
+        //Make sure the unacked messages from the producer failure are 0 and have been cleared
+        int numUnackedMessages = Whitebox.getInternalState(submittedRecords, "numUnackedMessages");
+        assertEquals(0, numUnackedMessages);
+
+        //call commitableOffsets to verify that acked records will be cleared
+        //Before KAFKA-14079 acked records would not be cleared if the head was unacked due to an ignored record failure
+        submittedRecords.committableOffsets();
+
+        //Make sure size is 0 for all lists
+        for (Deque<SubmittedRecords.SubmittedRecord> deque : submittedRecords.records.values()) {
+            assertEquals(0, deque.size());
+        }
+

Review Comment:
   It's a little strange that we're interacting so heavily with a private field in this test, and it doesn't explicitly address the case that we're trying to test for (which is that we can continue to commit offsets after a record that the producer failed to send has been dropped).
   
   What do you think about testing this case more directly? I'm thinking something like this might work:
   - Using different source offsets for each of the records
   - Invoking `workerTask.commitOffsets()`
   - Verifying that the intended offsets were passed to the mocked offset writer via its `offset` method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org