You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/29 21:57:35 UTC

[GitHub] [kafka] jolshan commented on a diff in pull request #12922: KAFKA-14397; Don't reset producer sequence number after delivery timeout

jolshan commented on code in PR #12922:
URL: https://github.com/apache/kafka/pull/12922#discussion_r1035324593


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -808,6 +808,162 @@ public void testIdempotenceWithMultipleInflights() throws Exception {
         assertEquals(1, request2.get().offset());
     }
 
+    @Test
+    public void testIdempotenceWithDeliveryTimeout() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = createTransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        // Send two produce requests with a time gap between them. This allows
+        // us to test delivery timeout expiration of the first request without
+        // affecting the second.
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, client.inFlightRequestCount());
+
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+
+        Future<RecordMetadata> request2 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, client.inFlightRequestCount());
+
+        // The first request fails after the delivery timeout is reached, but
+        // we still expect the remaining request to be inflight.
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+        sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1L);
+        sender.runOnce();
+        TestUtils.assertFutureThrows(request1, TimeoutException.class);
+        assertEquals(1, client.inFlightRequestCount());
+
+        // Later the second request returns successfully (which implies the first request
+        // was also successful). A successful result should be returned to the user.
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);

Review Comment:
   Do we have a case where this fails? Implying the first request was not successful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org