You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/29 22:55:45 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12922: KAFKA-14397; Don't reset producer sequence number after delivery timeout

hachikuji commented on code in PR #12922:
URL: https://github.com/apache/kafka/pull/12922#discussion_r1035364675


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -808,6 +808,162 @@ public void testIdempotenceWithMultipleInflights() throws Exception {
         assertEquals(1, request2.get().offset());
     }
 
+    @Test
+    public void testIdempotenceWithDeliveryTimeout() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = createTransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        // Send two produce requests with a time gap between them. This allows
+        // us to test delivery timeout expiration of the first request without
+        // affecting the second.
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, client.inFlightRequestCount());
+
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+
+        Future<RecordMetadata> request2 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, client.inFlightRequestCount());
+
+        // The first request fails after the delivery timeout is reached, but
+        // we still expect the remaining request to be inflight.
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+        sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1L);
+        sender.runOnce();
+        TestUtils.assertFutureThrows(request1, TimeoutException.class);
+        assertEquals(1, client.inFlightRequestCount());
+
+        // Later the second request returns successfully (which implies the first request
+        // was also successful). A successful result should be returned to the user.
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.runOnce();
+        assertTrue(request2.isDone());
+        assertEquals(1L, request2.get().offset());
+
+        // The producer epoch should not have been bumped and the next sequence
+        // number should be 2.
+        assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
+        assertEquals(new ProducerIdAndEpoch(producerId, (short) 0), transactionManager.producerIdAndEpoch());
+        appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+    }
+
+    @Test
+    public void testIdempotenceWithRequestTimeoutFollowingDeliveryTimeout() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = createTransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        // Send two produce requests with a time gap between them. This allows
+        // us to test delivery timeout expiration of the first request without
+        // affecting the second.
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, client.inFlightRequestCount());
+
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+
+        Future<RecordMetadata> request2 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, client.inFlightRequestCount());
+
+        // Both requests fail with a timeout error. The first one has reached the delivery
+        // timeout and should be failed. The second should be retried.
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+        sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1L);
+        sendIdempotentProducerResponse(1, tp0, Errors.REQUEST_TIMED_OUT, -1L);
+        sender.runOnce();
+        TestUtils.assertFutureThrows(request1, TimeoutException.class);
+        assertFalse(request2.isDone());
+        assertEquals(0, client.inFlightRequestCount());
+
+        // Because we don't know the outcome of these requests yet, we should
+        // continue retrying until we know whether the requests were successful.
+        sender.runOnce();
+        assertEquals(1, client.inFlightRequestCount());
+
+        // Finally the response returns successfully and the result is returned
+        // to the user.
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);

Review Comment:
   Yes, it is possible that the first request succeeded. We just didn't wait long enough to find that out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org