You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/18 22:52:25 UTC

[kafka] branch trunk updated: KAFKA-12870; Flush in progress not cleared after transaction completion (#10880)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 299eea8  KAFKA-12870; Flush in progress not cleared after transaction completion (#10880)
299eea8 is described below

commit 299eea88a5068f973dc055776c7137538ed01c62
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Jun 18 15:50:49 2021 -0700

    KAFKA-12870; Flush in progress not cleared after transaction completion (#10880)
    
    We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, [...]
    
    This patch fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../producer/internals/RecordAccumulator.java      |   8 +-
 .../kafka/clients/producer/internals/Sender.java   |   8 --
 .../producer/internals/ProducerTestUtils.java      |  45 +++++++
 .../producer/internals/RecordAccumulatorTest.java  |  68 +++++++++-
 .../clients/producer/internals/SenderTest.java     | 142 ++++++++++++++++++++-
 .../producer/internals/TransactionManagerTest.java |  20 +--
 7 files changed, 259 insertions(+), 34 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dd3389a..b852a4d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -68,7 +68,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
+              files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
 
     <suppress checks="JavaNCSS"
               files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e51fad7..24a80b9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -465,7 +465,13 @@ public final class RecordAccumulator {
                         long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                         boolean full = deque.size() > 1 || batch.isFull();
                         boolean expired = waitedTimeMs >= timeToWaitMs;
-                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
+                        boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
+                        boolean sendable = full
+                            || expired
+                            || exhausted
+                            || closed
+                            || flushInProgress()
+                            || transactionCompleting;
                         if (sendable && !backingOff) {
                             readyNodes.add(leader);
                         } else {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8b31094..2f55e62 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -432,14 +432,6 @@ public class Sender implements Runnable {
             }
         }
 
-        if (transactionManager.isCompleting() && !accumulator.flushInProgress()) {
-            // There may still be requests left which are being retried. Since we do not know whether they had
-            // been successfully appended to the broker log, we must resend them until their final status is clear.
-            // If they had been appended and we did not receive the error, then our sequence number would no longer
-            // be correct which would lead to an OutOfSequenceException.
-            accumulator.beginFlush();
-        }
-
         TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());
         if (nextRequestHandler == null)
             return false;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java
new file mode 100644
index 0000000..a841033
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ProducerTestUtils {
+    private static final int MAX_TRIES = 10;
+
+    static void runUntil(
+        Sender sender,
+        Supplier<Boolean> condition
+    ) {
+        runUntil(sender, condition, MAX_TRIES);
+    }
+
+    static void runUntil(
+        Sender sender,
+        Supplier<Boolean> condition,
+        int maxTries
+    ) {
+        int tries = 0;
+        while (!condition.get() && tries < maxTries) {
+            tries++;
+            sender.runOnce();
+        }
+        assertTrue(condition.get(), "Condition not satisfied after " + maxTries + " tries");
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index db4454d..06ed1ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -40,10 +40,12 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -695,7 +697,7 @@ public class RecordAccumulatorTest {
     }
 
     @Test
-    public void testIdempotenceWithOldMagic() throws InterruptedException {
+    public void testIdempotenceWithOldMagic() {
         // Simulate talking to an older broker, ie. one which supports a lower magic.
         ApiVersions apiVersions = new ApiVersions();
         int batchSize = 1025;
@@ -706,7 +708,7 @@ public class RecordAccumulatorTest {
         String metricGrpName = "producer-metrics";
 
         apiVersions.update("foobar", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
-        TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions());
+        TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, retryBackoffMs, apiVersions);
         RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
             CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
             new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
@@ -715,6 +717,52 @@ public class RecordAccumulatorTest {
     }
 
     @Test
+    public void testRecordsDrainedWhenTransactionCompleting() throws Exception {
+        int batchSize = 1025;
+        int deliveryTimeoutMs = 3200;
+        int lingerMs = 10;
+        long totalSize = 10 * batchSize;
+
+        TransactionManager transactionManager = Mockito.mock(TransactionManager.class);
+        RecordAccumulator accumulator = createTestRecordAccumulator(transactionManager, deliveryTimeoutMs,
+            batchSize, totalSize, CompressionType.NONE, lingerMs);
+
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(12345L, (short) 5);
+        Mockito.when(transactionManager.producerIdAndEpoch()).thenReturn(producerIdAndEpoch);
+        Mockito.when(transactionManager.isSendToPartitionAllowed(tp1)).thenReturn(true);
+        Mockito.when(transactionManager.isPartitionAdded(tp1)).thenReturn(true);
+        Mockito.when(transactionManager.firstInFlightSequence(tp1)).thenReturn(0);
+
+        // Initially, the transaction is still in progress, so we should respect the linger.
+        Mockito.when(transactionManager.isCompleting()).thenReturn(false);
+
+        accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
+            false, time.milliseconds());
+        accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
+            false, time.milliseconds());
+        assertTrue(accumulator.hasUndrained());
+
+        RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(cluster, time.milliseconds());
+        assertEquals(0, firstResult.readyNodes.size());
+        Map<Integer, List<ProducerBatch>> firstDrained = accumulator.drain(cluster, firstResult.readyNodes,
+            Integer.MAX_VALUE, time.milliseconds());
+        assertEquals(0, firstDrained.size());
+
+        // Once the transaction begins completion, then the batch should be drained immediately.
+        Mockito.when(transactionManager.isCompleting()).thenReturn(true);
+
+        RecordAccumulator.ReadyCheckResult secondResult = accumulator.ready(cluster, time.milliseconds());
+        assertEquals(1, secondResult.readyNodes.size());
+        Node readyNode = secondResult.readyNodes.iterator().next();
+
+        Map<Integer, List<ProducerBatch>> secondDrained = accumulator.drain(cluster, secondResult.readyNodes,
+            Integer.MAX_VALUE, time.milliseconds());
+        assertEquals(Collections.singleton(readyNode.id()), secondDrained.keySet());
+        List<ProducerBatch> batches = secondDrained.get(readyNode.id());
+        assertEquals(1, batches.size());
+    }
+
+    @Test
     public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
         long now = time.milliseconds();
         RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10);
@@ -1080,16 +1128,26 @@ public class RecordAccumulatorTest {
         }
     }
 
-
     private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) {
         int deliveryTimeoutMs = 3200;
         return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
     }
 
+    private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
+        return createTestRecordAccumulator(null, deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
+    }
+
     /**
      * Return a test RecordAccumulator instance
      */
-    private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
+    private RecordAccumulator createTestRecordAccumulator(
+        TransactionManager txnManager,
+        int deliveryTimeoutMs,
+        int batchSize,
+        long totalSize,
+        CompressionType type,
+        int lingerMs
+    ) {
         long retryBackoffMs = 100L;
         String metricGrpName = "producer-metrics";
 
@@ -1104,7 +1162,7 @@ public class RecordAccumulatorTest {
             metricGrpName,
             time,
             new ApiVersions(),
-            null,
+            txnManager,
             new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 26041cc..cfc720e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -108,6 +108,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.kafka.clients.producer.internals.ProducerTestUtils.runUntil;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -1033,7 +1034,7 @@ public class SenderTest {
         TransactionManager transactionManager = createTransactionManager();
 
         // Retries once
-        setupWithTransactionState(transactionManager, false, null, true, 1);
+        setupWithTransactionState(transactionManager, false, null, true, 1, 0);
 
         // Init producer id/epoch
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
@@ -2671,6 +2672,130 @@ public class SenderTest {
     }
 
     @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    @Test
     public void testIncompleteTransactionAbortOnShutdown() {
         // create a sender with retries = 1
         int maxRetries = 1;
@@ -2984,11 +3109,15 @@ public class SenderTest {
     }
     
     private void setupWithTransactionState(TransactionManager transactionManager) {
-        setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE);
+        setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, 0);
+    }
+
+    private void setupWithTransactionState(TransactionManager transactionManager, int lingerMs) {
+        setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, lingerMs);
     }
 
     private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
-        setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE);
+        setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0);
     }
 
     private void setupWithTransactionState(
@@ -2997,7 +3126,7 @@ public class SenderTest {
         BufferPool customPool,
         boolean updateMetadata
     ) {
-        setupWithTransactionState(transactionManager, guaranteeOrder, customPool, updateMetadata, Integer.MAX_VALUE);
+        setupWithTransactionState(transactionManager, guaranteeOrder, customPool, updateMetadata, Integer.MAX_VALUE, 0);
     }
 
     private void setupWithTransactionState(
@@ -3005,7 +3134,8 @@ public class SenderTest {
         boolean guaranteeOrder,
         BufferPool customPool,
         boolean updateMetadata,
-        int retries
+        int retries,
+        int lingerMs
     ) {
         long totalSize = 1024 * 1024;
         String metricGrpName = "producer-metrics";
@@ -3013,7 +3143,7 @@ public class SenderTest {
         this.metrics = new Metrics(metricConfig, time);
         BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
 
-        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
+        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, lingerMs, 0L,
             DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
         this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index c5e3466..a93af9d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -18,21 +18,17 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.RequestTestUtils;
-import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -44,6 +40,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -67,13 +64,16 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -3570,13 +3570,7 @@ public class TransactionManagerTest {
     }
 
     private void runUntil(Supplier<Boolean> condition) {
-        for (int i = 0; i < 5; i++) {
-            if (condition.get())
-                break;
-            sender.runOnce();
-        }
-        if (!condition.get())
-            throw new AssertionError("Condition was not satisfied after multiple runs");
+        ProducerTestUtils.runUntil(sender, condition);
     }
 
 }