You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/14 17:05:26 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

hachikuji opened a new pull request #10880:
URL: https://github.com/apache/kafka/pull/10880


   We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, which means that the linger time would effectively be ignored.
   
   The patch here fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10880:
URL: https://github.com/apache/kafka/pull/10880#discussion_r654615386



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       I remember we have similar utils in `TransactionManagerTest`, could we consolidate them?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Yup, I think that's also fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10880:
URL: https://github.com/apache/kafka/pull/10880#issuecomment-860877998


   Note I have a few additional tests which I will post shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10880:
URL: https://github.com/apache/kafka/pull/10880


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10880:
URL: https://github.com/apache/kafka/pull/10880#discussion_r654624934



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Would it be reasonable to add a helper to `Sender` for testing?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Would it be reasonable to add a `runUntil` helper to `Sender` for testing?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Decided to pull out a test util instead. Hopefully that is ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org