You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/18 22:52:25 UTC
[kafka] branch trunk updated: KAFKA-12870;
Flush in progress not cleared after transaction completion (#10880)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 299eea8 KAFKA-12870; Flush in progress not cleared after transaction completion (#10880)
299eea8 is described below
commit 299eea88a5068f973dc055776c7137538ed01c62
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Jun 18 15:50:49 2021 -0700
KAFKA-12870; Flush in progress not cleared after transaction completion (#10880)
We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, [...]
This patch fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
checkstyle/suppressions.xml | 2 +-
.../producer/internals/RecordAccumulator.java | 8 +-
.../kafka/clients/producer/internals/Sender.java | 8 --
.../producer/internals/ProducerTestUtils.java | 45 +++++++
.../producer/internals/RecordAccumulatorTest.java | 68 +++++++++-
.../clients/producer/internals/SenderTest.java | 142 ++++++++++++++++++++-
.../producer/internals/TransactionManagerTest.java | 20 +--
7 files changed, 259 insertions(+), 34 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dd3389a..b852a4d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
- files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
+ files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e51fad7..24a80b9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -465,7 +465,13 @@ public final class RecordAccumulator {
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
- boolean sendable = full || expired || exhausted || closed || flushInProgress();
+ boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
+ boolean sendable = full
+ || expired
+ || exhausted
+ || closed
+ || flushInProgress()
+ || transactionCompleting;
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8b31094..2f55e62 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -432,14 +432,6 @@ public class Sender implements Runnable {
}
}
- if (transactionManager.isCompleting() && !accumulator.flushInProgress()) {
- // There may still be requests left which are being retried. Since we do not know whether they had
- // been successfully appended to the broker log, we must resend them until their final status is clear.
- // If they had been appended and we did not receive the error, then our sequence number would no longer
- // be correct which would lead to an OutOfSequenceException.
- accumulator.beginFlush();
- }
-
TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());
if (nextRequestHandler == null)
return false;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java
new file mode 100644
index 0000000..a841033
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ProducerTestUtils {
+ private static final int MAX_TRIES = 10;
+
+ static void runUntil(
+ Sender sender,
+ Supplier<Boolean> condition
+ ) {
+ runUntil(sender, condition, MAX_TRIES);
+ }
+
+ static void runUntil(
+ Sender sender,
+ Supplier<Boolean> condition,
+ int maxTries
+ ) {
+ int tries = 0;
+ while (!condition.get() && tries < maxTries) {
+ tries++;
+ sender.runOnce();
+ }
+ assertTrue(condition.get(), "Condition not satisfied after " + maxTries + " tries");
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index db4454d..06ed1ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -40,10 +40,12 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -695,7 +697,7 @@ public class RecordAccumulatorTest {
}
@Test
- public void testIdempotenceWithOldMagic() throws InterruptedException {
+ public void testIdempotenceWithOldMagic() {
// Simulate talking to an older broker, ie. one which supports a lower magic.
ApiVersions apiVersions = new ApiVersions();
int batchSize = 1025;
@@ -706,7 +708,7 @@ public class RecordAccumulatorTest {
String metricGrpName = "producer-metrics";
apiVersions.update("foobar", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
- TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions());
+ TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, retryBackoffMs, apiVersions);
RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
@@ -715,6 +717,52 @@ public class RecordAccumulatorTest {
}
@Test
+ public void testRecordsDrainedWhenTransactionCompleting() throws Exception {
+ int batchSize = 1025;
+ int deliveryTimeoutMs = 3200;
+ int lingerMs = 10;
+ long totalSize = 10 * batchSize;
+
+ TransactionManager transactionManager = Mockito.mock(TransactionManager.class);
+ RecordAccumulator accumulator = createTestRecordAccumulator(transactionManager, deliveryTimeoutMs,
+ batchSize, totalSize, CompressionType.NONE, lingerMs);
+
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(12345L, (short) 5);
+ Mockito.when(transactionManager.producerIdAndEpoch()).thenReturn(producerIdAndEpoch);
+ Mockito.when(transactionManager.isSendToPartitionAllowed(tp1)).thenReturn(true);
+ Mockito.when(transactionManager.isPartitionAdded(tp1)).thenReturn(true);
+ Mockito.when(transactionManager.firstInFlightSequence(tp1)).thenReturn(0);
+
+ // Initially, the transaction is still in progress, so we should respect the linger.
+ Mockito.when(transactionManager.isCompleting()).thenReturn(false);
+
+ accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
+ false, time.milliseconds());
+ accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
+ false, time.milliseconds());
+ assertTrue(accumulator.hasUndrained());
+
+ RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(cluster, time.milliseconds());
+ assertEquals(0, firstResult.readyNodes.size());
+ Map<Integer, List<ProducerBatch>> firstDrained = accumulator.drain(cluster, firstResult.readyNodes,
+ Integer.MAX_VALUE, time.milliseconds());
+ assertEquals(0, firstDrained.size());
+
+ // Once the transaction begins completion, then the batch should be drained immediately.
+ Mockito.when(transactionManager.isCompleting()).thenReturn(true);
+
+ RecordAccumulator.ReadyCheckResult secondResult = accumulator.ready(cluster, time.milliseconds());
+ assertEquals(1, secondResult.readyNodes.size());
+ Node readyNode = secondResult.readyNodes.iterator().next();
+
+ Map<Integer, List<ProducerBatch>> secondDrained = accumulator.drain(cluster, secondResult.readyNodes,
+ Integer.MAX_VALUE, time.milliseconds());
+ assertEquals(Collections.singleton(readyNode.id()), secondDrained.keySet());
+ List<ProducerBatch> batches = secondDrained.get(readyNode.id());
+ assertEquals(1, batches.size());
+ }
+
+ @Test
public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
long now = time.milliseconds();
RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10);
@@ -1080,16 +1128,26 @@ public class RecordAccumulatorTest {
}
}
-
private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) {
int deliveryTimeoutMs = 3200;
return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
}
+ private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
+ return createTestRecordAccumulator(null, deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
+ }
+
/**
* Return a test RecordAccumulator instance
*/
- private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
+ private RecordAccumulator createTestRecordAccumulator(
+ TransactionManager txnManager,
+ int deliveryTimeoutMs,
+ int batchSize,
+ long totalSize,
+ CompressionType type,
+ int lingerMs
+ ) {
long retryBackoffMs = 100L;
String metricGrpName = "producer-metrics";
@@ -1104,7 +1162,7 @@ public class RecordAccumulatorTest {
metricGrpName,
time,
new ApiVersions(),
- null,
+ txnManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 26041cc..cfc720e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -108,6 +108,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.kafka.clients.producer.internals.ProducerTestUtils.runUntil;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -1033,7 +1034,7 @@ public class SenderTest {
TransactionManager transactionManager = createTransactionManager();
// Retries once
- setupWithTransactionState(transactionManager, false, null, true, 1);
+ setupWithTransactionState(transactionManager, false, null, true, 1, 0);
// Init producer id/epoch
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
@@ -2671,6 +2672,130 @@ public class SenderTest {
}
@Test
+ public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+ try (Metrics m = new Metrics()) {
+ int lingerMs = 50;
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+ TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+ setupWithTransactionState(txnManager, lingerMs);
+
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+ 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+ // Begin a transaction and successfully add one partition to it.
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ // Send a couple records and assert that they are not sent immediately (due to linger).
+ appendToAccumulator(tp0);
+ appendToAccumulator(tp0);
+ sender.runOnce();
+ assertFalse(client.hasInFlightRequests());
+
+ // Now begin the commit and assert that the Produce request is sent immediately
+ // without waiting for the linger.
+ txnManager.beginCommit();
+ runUntil(sender, client::hasInFlightRequests);
+
+ // Respond to the produce request and wait for the EndTxn request to be sent.
+ respondToProduce(tp0, Errors.NONE, 1L);
+ runUntil(sender, txnManager::hasInFlightRequest);
+
+ // Respond to the expected EndTxn request.
+ respondToEndTxn(Errors.NONE);
+ runUntil(sender, txnManager::isReady);
+
+ // Finally, we want to assert that the linger time is still effective
+ // when the new transaction begins.
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ appendToAccumulator(tp0);
+ appendToAccumulator(tp0);
+ time.sleep(lingerMs - 1);
+ sender.runOnce();
+ assertFalse(client.hasInFlightRequests());
+ assertTrue(accumulator.hasUndrained());
+
+ time.sleep(1);
+ runUntil(sender, client::hasInFlightRequests);
+ assertFalse(accumulator.hasUndrained());
+ }
+ }
+
+ @Test
+ public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+ try (Metrics m = new Metrics()) {
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+ TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+ setupWithTransactionState(txnManager);
+
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+ 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+ // Begin a transaction and successfully add one partition to it.
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ // Send one Produce request.
+ appendToAccumulator(tp0);
+ runUntil(sender, () -> client.requests().size() == 1);
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(client.hasInFlightRequests());
+ assertTrue(txnManager.hasInflightBatches(tp0));
+
+ // Enqueue another record and then commit the transaction. We expect the unsent record to
+ // get sent before the transaction can be completed.
+ appendToAccumulator(tp0);
+ txnManager.beginCommit();
+ runUntil(sender, () -> client.requests().size() == 2);
+
+ assertTrue(txnManager.isCompleting());
+ assertFalse(txnManager.hasInFlightRequest());
+ assertTrue(txnManager.hasInflightBatches(tp0));
+
+ // Now respond to the pending Produce requests.
+ respondToProduce(tp0, Errors.NONE, 0L);
+ respondToProduce(tp0, Errors.NONE, 1L);
+ runUntil(sender, txnManager::hasInFlightRequest);
+
+ // Finally, respond to the expected EndTxn request.
+ respondToEndTxn(Errors.NONE);
+ runUntil(sender, txnManager::isReady);
+ }
+ }
+
+ private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+ txnManager.maybeAddPartitionToTransaction(tp);
+ client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+ runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+ assertFalse(txnManager.hasInFlightRequest());
+ }
+
+ private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+ client.respond(
+ request -> request instanceof ProduceRequest,
+ produceResponse(tp, offset, error, 0)
+ );
+
+ }
+
+ private void respondToEndTxn(Errors error) {
+ client.respond(
+ request -> request instanceof EndTxnRequest,
+ new EndTxnResponse(new EndTxnResponseData()
+ .setErrorCode(error.code())
+ .setThrottleTimeMs(0))
+ );
+ }
+
+ @Test
public void testIncompleteTransactionAbortOnShutdown() {
// create a sender with retries = 1
int maxRetries = 1;
@@ -2984,11 +3109,15 @@ public class SenderTest {
}
private void setupWithTransactionState(TransactionManager transactionManager) {
- setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE);
+ setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, 0);
+ }
+
+ private void setupWithTransactionState(TransactionManager transactionManager, int lingerMs) {
+ setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, lingerMs);
}
private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
- setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE);
+ setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0);
}
private void setupWithTransactionState(
@@ -2997,7 +3126,7 @@ public class SenderTest {
BufferPool customPool,
boolean updateMetadata
) {
- setupWithTransactionState(transactionManager, guaranteeOrder, customPool, updateMetadata, Integer.MAX_VALUE);
+ setupWithTransactionState(transactionManager, guaranteeOrder, customPool, updateMetadata, Integer.MAX_VALUE, 0);
}
private void setupWithTransactionState(
@@ -3005,7 +3134,8 @@ public class SenderTest {
boolean guaranteeOrder,
BufferPool customPool,
boolean updateMetadata,
- int retries
+ int retries,
+ int lingerMs
) {
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
@@ -3013,7 +3143,7 @@ public class SenderTest {
this.metrics = new Metrics(metricConfig, time);
BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
- this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
+ this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, lingerMs, 0L,
DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index c5e3466..a93af9d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -18,21 +18,17 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.RequestTestUtils;
-import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
@@ -44,6 +40,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -67,13 +64,16 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -3570,13 +3570,7 @@ public class TransactionManagerTest {
}
private void runUntil(Supplier<Boolean> condition) {
- for (int i = 0; i < 5; i++) {
- if (condition.get())
- break;
- sender.runOnce();
- }
- if (!condition.get())
- throw new AssertionError("Condition was not satisfied after multiple runs");
+ ProducerTestUtils.runUntil(sender, condition);
}
}