You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/02 21:00:23 UTC

[kafka] branch 2.2 updated: KAFKA-8248; Ensure time updated before sending transactional request (#6613)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 98e440c  KAFKA-8248; Ensure time updated before sending transactional request (#6613)
98e440c is described below

commit 98e440c5b5ec0e1c742fc43eeeb846d11f602f28
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu May 2 09:29:22 2019 -0700

    KAFKA-8248; Ensure time updated before sending transactional request (#6613)
    
    This patch fixes a bug in the sending of transactional requests. We need to call `KafkaClient.send` with an updated current time. Failing to do so can result in an `IllegalStateExcepton` which leaves the producer effectively dead since the in-flight correlation id has been set, but no request has been sent. To avoid the same problem in the future, we update the in flight correlationId only after sending the request.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/ClusterConnectionStates.java     |   2 +-
 .../kafka/clients/producer/internals/Sender.java   |  28 +-
 .../producer/internals/TransactionManager.java     |   2 +-
 .../java/org/apache/kafka/clients/MockClient.java  | 195 ++++++----
 .../kafka/clients/producer/KafkaProducerTest.java  |  32 ++
 .../clients/producer/internals/SenderTest.java     | 416 ++++++++++-----------
 .../producer/internals/TransactionManagerTest.java | 341 +++++++++--------
 .../util/ReplicaFetcherMockBlockingSend.scala      |  16 +-
 8 files changed, 556 insertions(+), 476 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index e9bd971..0d33483 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -89,8 +89,8 @@ final class ClusterConnectionStates {
     public long connectionDelay(String id, long now) {
         NodeConnectionState state = nodeState.get(id);
         if (state == null) return 0;
-        long timeWaited = now - state.lastConnectAttemptMs;
         if (state.state.isDisconnected()) {
+            long timeWaited = now - state.lastConnectAttemptMs;
             return Math.max(state.reconnectBackoffMs - timeWaited, 0);
         } else {
             // When connecting or connected, we should be able to delay indefinitely since other events (connection or
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c1fc40b..30815c7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -232,7 +232,7 @@ public class Sender implements Runnable {
         // main loop, runs until close is called
         while (running) {
             try {
-                run(time.milliseconds());
+                runOnce();
             } catch (Exception e) {
                 log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
@@ -245,7 +245,7 @@ public class Sender implements Runnable {
         // wait until these are completed.
         while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
             try {
-                run(time.milliseconds());
+                runOnce();
             } catch (Exception e) {
                 log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
@@ -268,14 +268,14 @@ public class Sender implements Runnable {
     /**
      * Run a single iteration of sending
      *
-     * @param now The current POSIX time in milliseconds
      */
-    void run(long now) {
+    void runOnce() {
         if (transactionManager != null) {
             try {
                 if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                     // Check if the previous run expired batches which requires a reset of the producer state.
                     transactionManager.resetProducerId();
+
                 if (!transactionManager.isTransactional()) {
                     // this is an idempotent producer, so make sure we have a producer id
                     maybeWaitForProducerId();
@@ -283,9 +283,9 @@ public class Sender implements Runnable {
                     transactionManager.transitionToFatalError(
                         new KafkaException("The client hasn't received acknowledgment for " +
                             "some previously sent messages and can no longer retry them. It isn't safe to continue."));
-                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
+                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {
                     // as long as there are outstanding transactional requests, we simply wait for them to return
-                    client.poll(retryBackoffMs, now);
+                    client.poll(retryBackoffMs, time.milliseconds());
                     return;
                 }
 
@@ -295,7 +295,7 @@ public class Sender implements Runnable {
                     RuntimeException lastError = transactionManager.lastError();
                     if (lastError != null)
                         maybeAbortBatches(lastError);
-                    client.poll(retryBackoffMs, now);
+                    client.poll(retryBackoffMs, time.milliseconds());
                     return;
                 } else if (transactionManager.hasAbortableError()) {
                     accumulator.abortUndrainedBatches(transactionManager.lastError());
@@ -307,8 +307,9 @@ public class Sender implements Runnable {
             }
         }
 
-        long pollTimeout = sendProducerData(now);
-        client.poll(pollTimeout, now);
+        long currentTimeMs = time.milliseconds();
+        long pollTimeout = sendProducerData(currentTimeMs);
+        client.poll(pollTimeout, currentTimeMs);
     }
 
     private long sendProducerData(long now) {
@@ -392,7 +393,7 @@ public class Sender implements Runnable {
         return pollTimeout;
     }
 
-    private boolean maybeSendTransactionalRequest(long now) {
+    private boolean maybeSendTransactionalRequest() {
         if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
             if (transactionManager.isAborting())
                 accumulator.abortUndrainedBatches(new KafkaException("Failing batch since transaction was aborted"));
@@ -429,11 +430,12 @@ public class Sender implements Runnable {
                 if (targetNode != null) {
                     if (nextRequestHandler.isRetry())
                         time.sleep(nextRequestHandler.retryBackoffMs());
+                    long currentTimeMs = time.milliseconds();
                     ClientRequest clientRequest = client.newClientRequest(
-                        targetNode.idString(), requestBuilder, now, true, requestTimeoutMs, nextRequestHandler);
-                    transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
+                        targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs, nextRequestHandler);
                     log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
-                    client.send(clientRequest, now);
+                    client.send(clientRequest, currentTimeMs);
+                    transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
                     return true;
                 }
             } catch (IOException e) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index dd884c6..809e914 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -661,7 +661,7 @@ public class TransactionManager {
         lookupCoordinator(request.coordinatorType(), request.coordinatorKey());
     }
 
-    void setInFlightTransactionalRequestCorrelationId(int correlationId) {
+    void setInFlightCorrelationId(int correlationId) {
         inFlightRequestCorrelationId = correlationId;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 0dd42f8..eba279d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -30,7 +30,6 @@ import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -74,15 +73,7 @@ public class MockClient implements KafkaClient {
     private int correlation;
     private final Time time;
     private final MockMetadataUpdater metadataUpdater;
-    private final Set<String> ready = new HashSet<>();
-
-    // Nodes awaiting reconnect backoff, will not be chosen by leastLoadedNode
-    private final TransientSet<Node> blackedOut;
-    // Nodes which will always fail to connect, but can be chosen by leastLoadedNode
-    private final TransientSet<Node> unreachable;
-    // Nodes which have a delay before ultimately succeeding to connect
-    private final TransientSet<Node> delayedReady;
-
+    private final Map<String, ConnectionState> connections = new HashMap<>();
     private final Map<Node, Long> pendingAuthenticationErrors = new HashMap<>();
     private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>();
     // Use concurrent queue for requests so that requests may be queried from a different thread
@@ -102,36 +93,30 @@ public class MockClient implements KafkaClient {
     public MockClient(Time time, MockMetadataUpdater metadataUpdater) {
         this.time = time;
         this.metadataUpdater = metadataUpdater;
-        this.blackedOut = new TransientSet<>(time);
-        this.unreachable = new TransientSet<>(time);
-        this.delayedReady = new TransientSet<>(time);
+    }
+
+    private ConnectionState connectionState(String idString) {
+        ConnectionState connectionState = connections.get(idString);
+        if (connectionState == null) {
+            connectionState = new ConnectionState();
+            connections.put(idString, connectionState);
+        }
+        return connectionState;
     }
 
     @Override
     public boolean isReady(Node node, long now) {
-        return ready.contains(node.idString());
+        return connectionState(node.idString()).isReady(now);
     }
 
     @Override
     public boolean ready(Node node, long now) {
-        if (blackedOut.contains(node, now))
-            return false;
-
-        if (unreachable.contains(node, now)) {
-            blackout(node, 100);
-            return false;
-        }
-
-        if (delayedReady.contains(node, now))
-            return false;
-
-        ready.add(node.idString());
-        return true;
+        return connectionState(node.idString()).ready(now);
     }
 
     @Override
     public long connectionDelay(Node node, long now) {
-        return blackedOut.expirationDelayMs(node, now);
+        return connectionState(node.idString()).connectionDelay(now);
     }
 
     @Override
@@ -140,16 +125,20 @@ public class MockClient implements KafkaClient {
     }
 
     public void blackout(Node node, long durationMs) {
-        blackedOut.add(node, durationMs);
+        connectionState(node.idString()).backoff(time.milliseconds() + durationMs);
     }
 
     public void setUnreachable(Node node, long durationMs) {
         disconnect(node.idString());
-        unreachable.add(node, durationMs);
+        connectionState(node.idString()).setUnreachable(time.milliseconds() + durationMs);
+    }
+
+    public void throttle(Node node, long durationMs) {
+        connectionState(node.idString()).throttle(time.milliseconds() + durationMs);
     }
 
     public void delayReady(Node node, long durationMs) {
-        delayedReady.add(node, durationMs);
+        connectionState(node.idString()).setReadyDelayed(time.milliseconds() + durationMs);
     }
 
     public void authenticationFailed(Node node, long blackoutMs) {
@@ -165,7 +154,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public boolean connectionFailed(Node node) {
-        return blackedOut.contains(node);
+        return connectionState(node.idString()).isBackingOff(time.milliseconds());
     }
 
     @Override
@@ -186,11 +175,14 @@ public class MockClient implements KafkaClient {
                 iter.remove();
             }
         }
-        ready.remove(node);
+        connectionState(node).disconnect();
     }
 
     @Override
     public void send(ClientRequest request, long now) {
+        if (!connectionState(request.destination()).isReady(now))
+            throw new IllegalStateException("Cannot send " + request + " since the destination is not ready");
+
         // Check if the request is directed to a node with a pending authentication error.
         for (Iterator<Map.Entry<Node, Long>> authErrorIter =
              pendingAuthenticationErrors.entrySet().iterator(); authErrorIter.hasNext(); ) {
@@ -436,9 +428,7 @@ public class MockClient implements KafkaClient {
     }
 
     public void reset() {
-        ready.clear();
-        blackedOut.clear();
-        unreachable.clear();
+        connections.clear();
         requests.clear();
         responses.clear();
         futureResponses.clear();
@@ -498,7 +488,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public boolean hasReadyNodes(long now) {
-        return !ready.isEmpty();
+        return connections.values().stream().anyMatch(cxn -> cxn.isReady(now));
     }
 
     @Override
@@ -536,14 +526,14 @@ public class MockClient implements KafkaClient {
 
     @Override
     public void close(String node) {
-        ready.remove(node);
+        connections.remove(node);
     }
 
     @Override
     public Node leastLoadedNode(long now) {
         // Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
         for (Node node : metadataUpdater.fetchNodes()) {
-            if (!blackedOut.contains(node, now))
+            if (!connectionState(node.idString()).isBackingOff(now))
                 return node;
         }
         return null;
@@ -579,45 +569,6 @@ public class MockClient implements KafkaClient {
         }
     }
 
-    private static class TransientSet<T> {
-        // The elements in the set mapped to their expiration timestamps
-        private final Map<T, Long> elements = new HashMap<>();
-        private final Time time;
-
-        private TransientSet(Time time) {
-            this.time = time;
-        }
-
-        boolean contains(T element) {
-            return contains(element, time.milliseconds());
-        }
-
-        boolean contains(T element, long now) {
-            return expirationDelayMs(element, now) > 0;
-        }
-
-        void add(T element, long durationMs) {
-            elements.put(element, time.milliseconds() + durationMs);
-        }
-
-        long expirationDelayMs(T element, long now) {
-            Long expirationTimeMs = elements.get(element);
-            if (expirationTimeMs == null) {
-                return 0;
-            } else if (now > expirationTimeMs) {
-                elements.remove(element);
-                return 0;
-            } else {
-                return expirationTimeMs - now;
-            }
-        }
-
-        void clear() {
-            elements.clear();
-        }
-
-    }
-
     /**
      * This is a dumbed down version of {@link MetadataUpdater} which is used to facilitate
      * metadata tracking primarily in order to serve {@link KafkaClient#leastLoadedNode(long)}
@@ -678,4 +629,92 @@ public class MockClient implements KafkaClient {
         }
     }
 
+    private static class ConnectionState {
+        enum State { CONNECTING, CONNECTED, DISCONNECTED }
+
+        private long throttledUntilMs = 0L;
+        private long readyDelayedUntilMs = 0L;
+        private long backingOffUntilMs = 0L;
+        private long unreachableUntilMs = 0L;
+        private State state = State.DISCONNECTED;
+
+        void backoff(long untilMs) {
+            backingOffUntilMs = untilMs;
+        }
+
+        void throttle(long untilMs) {
+            throttledUntilMs = untilMs;
+        }
+
+        void setUnreachable(long untilMs) {
+            unreachableUntilMs = untilMs;
+        }
+
+        void setReadyDelayed(long untilMs) {
+            readyDelayedUntilMs = untilMs;
+        }
+
+        boolean isReady(long now) {
+            return state == State.CONNECTED && notThrottled(now);
+        }
+
+        boolean isReadyDelayed(long now) {
+            return now < readyDelayedUntilMs;
+        }
+
+        boolean notThrottled(long now) {
+            return now > throttledUntilMs;
+        }
+
+        boolean isBackingOff(long now) {
+            return now < backingOffUntilMs;
+        }
+
+        boolean isUnreachable(long now) {
+            return now < unreachableUntilMs;
+        }
+
+        void disconnect() {
+            state = State.DISCONNECTED;
+        }
+
+        long connectionDelay(long now) {
+            if (state != State.DISCONNECTED)
+                return Long.MAX_VALUE;
+
+            if (backingOffUntilMs > now)
+                return backingOffUntilMs - now;
+
+            return 0;
+        }
+
+        boolean ready(long now) {
+            switch (state) {
+                case CONNECTED:
+                    return notThrottled(now);
+
+                case CONNECTING:
+                    if (isReadyDelayed(now))
+                        return false;
+                    state = State.CONNECTED;
+                    return ready(now);
+
+                case DISCONNECTED:
+                    if (isBackingOff(now)) {
+                        return false;
+                    } else if (isUnreachable(now)) {
+                        backingOffUntilMs = now + 100;
+                        return false;
+                    }
+
+                    state = State.CONNECTING;
+                    return ready(now);
+
+                default:
+                    throw new IllegalArgumentException("Invalid state: " + state);
+            }
+        }
+
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index f3faf26..0515099 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
@@ -648,6 +650,36 @@ public class KafkaProducerTest {
         }
     }
 
+    @Test
+    public void testInitTransactionWhileThrottled() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        Node node = metadata.fetch().nodes().get(0);
+        client.throttle(node, 5000);
+
+        client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, new Node(0, "host1", 1000)));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+        }
+    }
+
+    private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
+        return new InitProducerIdResponse(0, error, producerId, producerEpoch);
+    }
+
     @Test(expected = KafkaException.class)
     public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
         Map<String, Object> configs = new HashMap<>();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 1acdfed..2122646 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -142,17 +142,17 @@ public class SenderTest {
         long offset = 0;
         Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
                 null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
+        sender.runOnce(); // connect
+        sender.runOnce(); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
         assertTrue(client.hasInFlightRequests());
         client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals("All requests completed.", 0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
         assertFalse(client.hasInFlightRequests());
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue("Request should be completed", future.isDone());
         assertEquals(offset, future.get().offset());
     }
@@ -188,8 +188,8 @@ public class SenderTest {
             }
         }, produceResponse(tp0, offset, Errors.NONE, 0));
 
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
+        sender.runOnce(); // connect
+        sender.runOnce(); // send produce request
 
         assertTrue("Request should be completed", future.isDone());
         assertEquals(offset, future.get().offset());
@@ -245,8 +245,8 @@ public class SenderTest {
             }
         }, produceResponse);
 
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
+        sender.runOnce(); // connect
+        sender.runOnce(); // send produce request
 
         assertTrue("Request should be completed", future1.isDone());
         assertTrue("Request should be completed", future2.isDone());
@@ -311,10 +311,10 @@ public class SenderTest {
 
         // Append a message so that topic metrics are created
         accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
+        sender.runOnce(); // connect
+        sender.runOnce(); // send produce request
         client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         // Create throttle time metrics
         Sender.throttleTimeSensor(metricsRegistry);
 
@@ -338,46 +338,46 @@ public class SenderTest {
                     maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-            sender.run(time.milliseconds()); // connect
-            sender.run(time.milliseconds()); // send produce request
+            sender.runOnce(); // connect
+            sender.runOnce(); // send produce request
             String id = client.requests().peek().destination();
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
             assertEquals(1, sender.inFlightBatches(tp0).size());
-            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+            assertTrue("Client ready status should be true", client.isReady(node, time.milliseconds()));
             client.disconnect(id);
             assertEquals(0, client.inFlightRequestCount());
             assertFalse(client.hasInFlightRequests());
-            assertFalse("Client ready status should be false", client.isReady(node, 0L));
+            assertFalse("Client ready status should be false", client.isReady(node, time.milliseconds()));
             // the batch is in accumulator.inFlightBatches until it expires
             assertEquals(1, sender.inFlightBatches(tp0).size());
-            sender.run(time.milliseconds()); // receive error
-            sender.run(time.milliseconds()); // reconnect
-            sender.run(time.milliseconds()); // resend
+            sender.runOnce(); // receive error
+            sender.runOnce(); // reconnect
+            sender.runOnce(); // resend
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
             assertEquals(1, sender.inFlightBatches(tp0).size());
             long offset = 0;
             client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
-            sender.run(time.milliseconds());
+            sender.runOnce();
             assertTrue("Request should have retried and completed", future.isDone());
             assertEquals(offset, future.get().offset());
             assertEquals(0, sender.inFlightBatches(tp0).size());
 
             // do an unsuccessful retry
             future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-            sender.run(time.milliseconds()); // send produce request
+            sender.runOnce(); // send produce request
             assertEquals(1, sender.inFlightBatches(tp0).size());
             for (int i = 0; i < maxRetries + 1; i++) {
                 client.disconnect(client.requests().peek().destination());
-                sender.run(time.milliseconds()); // receive error
+                sender.runOnce(); // receive error
                 assertEquals(0, sender.inFlightBatches(tp0).size());
-                sender.run(time.milliseconds()); // reconnect
-                sender.run(time.milliseconds()); // resend
+                sender.runOnce(); // reconnect
+                sender.runOnce(); // resend
                 assertEquals(i > 0 ? 0 : 1, sender.inFlightBatches(tp0).size());
             }
-            sender.run(time.milliseconds());
+            sender.runOnce();
             assertFutureFailure(future, NetworkException.class);
             assertEquals(0, sender.inFlightBatches(tp0).size());
         } finally {
@@ -401,14 +401,14 @@ public class SenderTest {
             // Send the first message.
             TopicPartition tp2 = new TopicPartition("test", 1);
             accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
-            sender.run(time.milliseconds()); // connect
-            sender.run(time.milliseconds()); // send produce request
+            sender.runOnce(); // connect
+            sender.runOnce(); // send produce request
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
-            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+            assertTrue("Client ready status should be true", client.isReady(node, time.milliseconds()));
             assertEquals(1, sender.inFlightBatches(tp2).size());
 
             time.sleep(900);
@@ -420,7 +420,7 @@ public class SenderTest {
             client.prepareMetadataUpdate(metadataUpdate2);
             // Sender should not send the second message to node 0.
             assertEquals(1, sender.inFlightBatches(tp2).size());
-            sender.run(time.milliseconds());  // receive the response for the previous send, and send the new batch
+            sender.runOnce();  // receive the response for the previous send, and send the new batch
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
             assertEquals(1, sender.inFlightBatches(tp2).size());
@@ -468,7 +468,7 @@ public class SenderTest {
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
-        sender.run(time.milliseconds());  // We should try to flush the batch, but we expire it instead without sending anything.
+        sender.runOnce();  // We should try to flush the batch, but we expire it instead without sending anything.
         assertEquals("Callbacks not invoked for expiry", messagesPerBatch, expiryCallbackCount.get());
         assertNull("Unexpected exception", unexpectedException.get());
         // Make sure that the reconds were appended back to the batch.
@@ -487,16 +487,16 @@ public class SenderTest {
         client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
 
         Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
         client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
-        sender.run(time.milliseconds());  // send produce request
+        sender.runOnce();  // send produce request
         client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
         assertEquals(0, sender.inFlightBatches(tp0).size());
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue("Request should be completed", future.isDone());
 
         assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp0.topic()));
@@ -504,16 +504,16 @@ public class SenderTest {
         client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
         assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
         future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
         client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
-        sender.run(time.milliseconds());  // send produce request
+        sender.runOnce();  // send produce request
         client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
         assertEquals(0, sender.inFlightBatches(tp0).size());
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue("Request should be completed", future.isDone());
     }
 
@@ -546,14 +546,14 @@ public class SenderTest {
     public void testCanRetryWithoutIdempotence() throws Exception {
         // do a successful retry
         Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
+        sender.runOnce(); // connect
+        sender.runOnce(); // send produce request
         String id = client.requests().peek().destination();
         Node node = new Node(Integer.parseInt(id), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertTrue(client.hasInFlightRequests());
         assertEquals(1, sender.inFlightBatches(tp0).size());
-        assertTrue("Client ready status should be true", client.isReady(node, 0L));
+        assertTrue("Client ready status should be true", client.isReady(node, time.milliseconds()));
         assertFalse(future.isDone());
 
         client.respond(new MockClient.RequestMatcher() {
@@ -564,7 +564,7 @@ public class SenderTest {
                 return true;
             }
         }, produceResponse(tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(future.isDone());
         try {
             future.get();
@@ -585,7 +585,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
@@ -594,7 +594,7 @@ public class SenderTest {
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -604,7 +604,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
 
-        sender.run(time.milliseconds()); // receive response 0
+        sender.runOnce(); // receive response 0
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
@@ -613,7 +613,7 @@ public class SenderTest {
         assertFalse(request2.isDone());
 
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
-        sender.run(time.milliseconds()); // receive response 1
+        sender.runOnce(); // receive response 1
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertFalse(client.hasInFlightRequests());
         assertEquals(0, sender.inFlightBatches(tp0).size());
@@ -635,7 +635,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
@@ -644,11 +644,11 @@ public class SenderTest {
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
          // Send third ProduceRequest
         Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertEquals(3, client.inFlightRequestCount());
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
@@ -659,7 +659,7 @@ public class SenderTest {
         assertTrue(client.isReady(node, time.milliseconds()));
 
         sendIdempotentProducerResponse(0, tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
-        sender.run(time.milliseconds()); // receive response 0
+        sender.runOnce(); // receive response 0
 
         // Queue the fourth request, it shouldn't be sent until the first 3 complete.
         Future<RecordMetadata> request4 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -668,15 +668,15 @@ public class SenderTest {
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
-        sender.run(time.milliseconds()); // re send request 1, receive response 2
+        sender.runOnce(); // re send request 1, receive response 2
 
         sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
-        sender.run(time.milliseconds()); // receive response 3
+        sender.runOnce(); // receive response 3
 
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
         assertEquals(1, client.inFlightRequestCount());
 
-        sender.run(time.milliseconds()); // Do nothing, we are reduced to one in flight request during retries.
+        sender.runOnce(); // Do nothing, we are reduced to one in flight request during retries.
 
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());  // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented.
         assertEquals(1, client.inFlightRequestCount());
@@ -684,19 +684,19 @@ public class SenderTest {
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
-        sender.run(time.milliseconds());  // receive response 1
+        sender.runOnce();  // receive response 1
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
         assertFalse(client.hasInFlightRequests());
         assertEquals(0, sender.inFlightBatches(tp0).size());
 
-        sender.run(time.milliseconds()); // send request 2;
+        sender.runOnce(); // send request 2;
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
-        sender.run(time.milliseconds());  // receive response 2
+        sender.runOnce();  // receive response 2
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
@@ -704,12 +704,12 @@ public class SenderTest {
         assertFalse(client.hasInFlightRequests());
         assertEquals(0, sender.inFlightBatches(tp0).size());
 
-        sender.run(time.milliseconds()); // send request 3
+        sender.runOnce(); // send request 3
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L);
-        sender.run(time.milliseconds());  // receive response 3, send request 4 since we are out of 'retry' mode.
+        sender.runOnce();  // receive response 3, send request 4 since we are out of 'retry' mode.
         assertEquals(2, transactionManager.lastAckedSequence(tp0));
         assertTrue(request3.isDone());
         assertEquals(2, request3.get().offset());
@@ -717,7 +717,7 @@ public class SenderTest {
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L);
-        sender.run(time.milliseconds());  // receive response 4
+        sender.runOnce();  // receive response 4
         assertEquals(3, transactionManager.lastAckedSequence(tp0));
         assertTrue(request4.isDone());
         assertEquals(3, request4.get().offset());
@@ -735,7 +735,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
@@ -744,7 +744,7 @@ public class SenderTest {
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -754,7 +754,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(0, tp0, Errors.MESSAGE_TOO_LARGE, -1L);
 
-        sender.run(time.milliseconds()); // receive response 0, should adjust sequences of future batches.
+        sender.runOnce(); // receive response 0, should adjust sequences of future batches.
         assertFutureFailure(request1, RecordTooLargeException.class);
 
         assertEquals(1, client.inFlightRequestCount());
@@ -762,19 +762,19 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
 
-        sender.run(time.milliseconds()); // receive response 1
+        sender.runOnce(); // receive response 1
 
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
 
-        sender.run(time.milliseconds()); // resend request 1
+        sender.runOnce(); // resend request 1
 
         assertEquals(1, client.inFlightRequestCount());
 
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
-        sender.run(time.milliseconds());  // receive response 1
+        sender.runOnce();  // receive response 1
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
 
@@ -795,7 +795,7 @@ public class SenderTest {
         // Send first ProduceRequest with multiple messages.
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
@@ -805,11 +805,11 @@ public class SenderTest {
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0);
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
@@ -821,7 +821,7 @@ public class SenderTest {
         // This OutOfOrderSequence is fatal since it is returned for the batch succeeding the last acknowledged batch.
         sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFutureFailure(request2, OutOfOrderSequenceException.class);
     }
 
@@ -836,7 +836,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
@@ -845,7 +845,7 @@ public class SenderTest {
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -858,7 +858,7 @@ public class SenderTest {
 
         client.respondToRequest(secondClientRequest, produceResponse(tp0, -1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1));
 
-        sender.run(time.milliseconds()); // receive response 1
+        sender.runOnce(); // receive response 1
         Deque<ProducerBatch> queuedBatches = accumulator.batches().get(tp0);
 
         // Make sure that we are queueing the second batch first.
@@ -869,7 +869,7 @@ public class SenderTest {
 
         client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1));
 
-        sender.run(time.milliseconds()); // receive response 0
+        sender.runOnce(); // receive response 0
 
         // Make sure we requeued both batches in the correct order.
         assertEquals(2, queuedBatches.size());
@@ -880,25 +880,25 @@ public class SenderTest {
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
 
-        sender.run(time.milliseconds()); // send request 0
+        sender.runOnce(); // send request 0
         assertEquals(1, client.inFlightRequestCount());
-        sender.run(time.milliseconds()); // don't do anything, only one inflight allowed once we are retrying.
+        sender.runOnce(); // don't do anything, only one inflight allowed once we are retrying.
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
 
         // Make sure that the requests are sent in order, even though the previous responses were not in order.
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
-        sender.run(time.milliseconds());  // receive response 0
+        sender.runOnce();  // receive response 0
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
 
-        sender.run(time.milliseconds()); // send request 1
+        sender.runOnce(); // send request 1
         assertEquals(1, client.inFlightRequestCount());
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
-        sender.run(time.milliseconds());  // receive response 1
+        sender.runOnce();  // receive response 1
 
         assertFalse(client.hasInFlightRequests());
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
@@ -918,14 +918,14 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
@@ -936,7 +936,7 @@ public class SenderTest {
 
         client.respondToRequest(secondClientRequest, produceResponse(tp0, 1, Errors.NONE, 1));
 
-        sender.run(time.milliseconds()); // receive response 1
+        sender.runOnce(); // receive response 1
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
         assertFalse(request1.isDone());
@@ -948,7 +948,7 @@ public class SenderTest {
 
         client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.REQUEST_TIMED_OUT, -1));
 
-        sender.run(time.milliseconds()); // receive response 0
+        sender.runOnce(); // receive response 0
 
         // Make sure we requeued both batches in the correct order.
         assertEquals(1, queuedBatches.size());
@@ -956,7 +956,7 @@ public class SenderTest {
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
 
-        sender.run(time.milliseconds()); // resend request 0
+        sender.runOnce(); // resend request 0
         assertEquals(1, client.inFlightRequestCount());
 
         assertEquals(1, client.inFlightRequestCount());
@@ -964,7 +964,7 @@ public class SenderTest {
 
         // Make sure we handle the out of order successful responses correctly.
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
-        sender.run(time.milliseconds());  // receive response 0
+        sender.runOnce();  // receive response 0
         assertEquals(0, queuedBatches.size());
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertEquals(0, client.inFlightRequestCount());
@@ -991,7 +991,7 @@ public class SenderTest {
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertFutureFailure(request1, TimeoutException.class);
         assertFalse(transactionManager.hasUnresolvedSequence(tp0));
@@ -1008,18 +1008,18 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         // We separate the two appends by 1 second so that the two batches
         // don't expire at the same time.
         time.sleep(1000L);
 
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1);
-        sender.run(time.milliseconds());  // receive first response
+        sender.runOnce();  // receive first response
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
         Node node = metadata.fetch().nodes().get(0);
@@ -1029,7 +1029,7 @@ public class SenderTest {
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
-        sender.run(time.milliseconds()); // now expire the first batch.
+        sender.runOnce(); // now expire the first batch.
         assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
         assertEquals(0, sender.inFlightBatches(tp0).size());
@@ -1039,11 +1039,11 @@ public class SenderTest {
         time.sleep(20);
         assertFalse(request2.isDone());
 
-        sender.run(time.milliseconds());  // send second request
+        sender.runOnce();  // send second request
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1);
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
-        sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
+        sender.runOnce(); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
         assertEquals(0, sender.inFlightBatches(tp0).size());
@@ -1055,7 +1055,7 @@ public class SenderTest {
         assertEquals(2L, transactionManager.sequenceNumber(tp0).longValue());
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
 
-        sender.run(time.milliseconds());  // clear the unresolved state, send the pending request.
+        sender.runOnce();  // clear the unresolved state, send the pending request.
         assertFalse(transactionManager.hasUnresolvedSequence(tp0));
         assertTrue(transactionManager.hasProducerId());
         assertEquals(0, batches.size());
@@ -1075,23 +1075,23 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
 
         time.sleep(1000L);
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
 
         assertEquals(2, client.inFlightRequestCount());
 
         sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
-        sender.run(time.milliseconds());  // receive first response
+        sender.runOnce();  // receive first response
 
         Node node = metadata.fetch().nodes().get(0);
         time.sleep(1000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
-        sender.run(time.milliseconds()); // now expire the first batch.
+        sender.runOnce(); // now expire the first batch.
         assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
         // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
@@ -1099,9 +1099,9 @@ public class SenderTest {
 
         time.sleep(20);
         assertFalse(request2.isDone());
-        sender.run(time.milliseconds());  // send second request
+        sender.runOnce();  // send second request
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
-        sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
+        sender.runOnce(); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
         assertFutureFailure(request2, OutOfOrderSequenceException.class);
 
         Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
@@ -1128,10 +1128,10 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
 
-        sender.run(time.milliseconds());  // receive response
+        sender.runOnce();  // receive response
         assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
 
         Node node = metadata.fetch().nodes().get(0);
@@ -1139,7 +1139,7 @@ public class SenderTest {
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
-        sender.run(time.milliseconds()); // now expire the batch.
+        sender.runOnce(); // now expire the batch.
 
         assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
@@ -1170,8 +1170,8 @@ public class SenderTest {
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // connect.
-        sender.run(time.milliseconds());  // send.
+        sender.runOnce();  // connect.
+        sender.runOnce();  // send.
 
         assertEquals(1, client.inFlightRequestCount());
 
@@ -1179,16 +1179,16 @@ public class SenderTest {
         responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(failedResponse.isDone());
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
         prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
         assertEquals(producerId + 1, transactionManager.producerIdAndEpoch().producerId);
-        sender.run(time.milliseconds());  // send request to tp1
+        sender.runOnce();  // send request to tp1
 
         assertFalse(successfulResponse.isDone());
         client.respond(produceResponse(tp1, 10, Errors.NONE, -1));
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(successfulResponse.isDone());
         assertEquals(10, successfulResponse.get().offset());
@@ -1214,7 +1214,7 @@ public class SenderTest {
             "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
             "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // connect and send.
+        sender.runOnce();  // connect and send.
 
         assertEquals(1, client.inFlightRequestCount());
 
@@ -1223,15 +1223,15 @@ public class SenderTest {
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
         sender.initiateClose(); // initiate close
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(failedResponse.isDone());
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
 
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
-                prepareInitPidResponse(Errors.NONE, producerId + 1, (short) 1);
-                sender.run(time.milliseconds());
+                prepareInitProducerResponse(Errors.NONE, producerId + 1, (short) 1);
+                sender.runOnce();
                 return !accumulator.hasUndrained();
             }
         }, 5000, "Failed to drain batches");
@@ -1253,7 +1253,7 @@ public class SenderTest {
             "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
             "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // connect and send.
+        sender.runOnce();  // connect and send.
 
         assertEquals(1, client.inFlightRequestCount());
 
@@ -1261,11 +1261,11 @@ public class SenderTest {
         responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(failedResponse.isDone());
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
         sender.forceClose(); // initiate force close
-        sender.run(time.milliseconds()); // this should not block
+        sender.runOnce(); // this should not block
         sender.run(); // run main loop to test forceClose flag
         assertTrue("Pending batches are not aborted.", !accumulator.hasUndrained());
         assertTrue(successfulResponse.isDone());
@@ -1289,8 +1289,8 @@ public class SenderTest {
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // connect.
-        sender.run(time.milliseconds());  // send.
+        sender.runOnce();  // connect.
+        sender.runOnce();  // send.
 
         assertEquals(1, client.inFlightRequestCount());
 
@@ -1298,17 +1298,17 @@ public class SenderTest {
         responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(failedResponse.isDone());
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
         prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
         assertEquals(producerId + 1, transactionManager.producerIdAndEpoch().producerId);
-        sender.run(time.milliseconds());  // send request to tp1 with the old producerId
+        sender.runOnce();  // send request to tp1 with the old producerId
 
         assertFalse(successfulResponse.isDone());
         // The response comes back with a retriable error.
         client.respond(produceResponse(tp1, 0, Errors.NOT_LEADER_FOR_PARTITION, -1));
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(successfulResponse.isDone());
         // Since the batch has an old producerId, it will not be retried yet again, but will be failed with a Fatal
@@ -1333,7 +1333,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
@@ -1342,7 +1342,7 @@ public class SenderTest {
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -1355,14 +1355,14 @@ public class SenderTest {
 
         client.respondToRequest(secondClientRequest, produceResponse(tp0, 1000, Errors.NONE, 0));
 
-        sender.run(time.milliseconds()); // receive response 1
+        sender.runOnce(); // receive response 1
 
         assertEquals(1000, transactionManager.lastAckedOffset(tp0));
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
 
         client.respondToRequest(firstClientRequest, produceResponse(tp0, ProduceResponse.INVALID_OFFSET, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
 
-        sender.run(time.milliseconds()); // receive response 0
+        sender.runOnce(); // receive response 0
 
         // Make sure that the last ack'd sequence doesn't change.
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
@@ -1386,7 +1386,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
@@ -1394,7 +1394,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
-        sender.run(time.milliseconds());  // receive the response.
+        sender.runOnce();  // receive the response.
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
@@ -1404,14 +1404,14 @@ public class SenderTest {
         // Send second ProduceRequest, a single batch with 2 records.
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
 
         sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
-        sender.run(time.milliseconds()); // receive response 0, should be retried since the logStartOffset > lastAckedOffset.
+        sender.runOnce(); // receive response 0, should be retried since the logStartOffset > lastAckedOffset.
 
         // We should have reset the sequence number state of the partition because the state was lost on the broker.
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -1419,11 +1419,11 @@ public class SenderTest {
         assertFalse(request2.isDone());
         assertFalse(client.hasInFlightRequests());
 
-        sender.run(time.milliseconds()); // should retry request 1
+        sender.runOnce(); // should retry request 1
 
         // resend the request. Note that the expected sequence is 0, since we have lost producer state on the broker.
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
-        sender.run(time.milliseconds()); // receive response 1
+        sender.runOnce(); // receive response 1
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(client.hasInFlightRequests());
@@ -1444,7 +1444,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
@@ -1452,7 +1452,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
-        sender.run(time.milliseconds());  // receive the response.
+        sender.runOnce();  // receive the response.
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
@@ -1461,14 +1461,14 @@ public class SenderTest {
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
 
         sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, -1L);
-        sender.run(time.milliseconds()); // receive response 0, should be retried without resetting the sequence numbers since the log start offset is unknown.
+        sender.runOnce(); // receive response 0, should be retried without resetting the sequence numbers since the log start offset is unknown.
 
         // We should have reset the sequence number state of the partition because the state was lost on the broker.
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
@@ -1476,12 +1476,12 @@ public class SenderTest {
         assertFalse(request2.isDone());
         assertFalse(client.hasInFlightRequests());
 
-        sender.run(time.milliseconds()); // should retry request 1
+        sender.runOnce(); // should retry request 1
 
         // resend the request. Note that the expected sequence is 1, since we never got the logStartOffset in the previous
         // response and hence we didn't reset the sequence numbers.
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L);
-        sender.run(time.milliseconds()); // receive response 1
+        sender.runOnce(); // receive response 1
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertFalse(client.hasInFlightRequests());
@@ -1502,7 +1502,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
@@ -1510,7 +1510,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
-        sender.run(time.milliseconds());  // receive the response.
+        sender.runOnce();  // receive the response.
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
@@ -1519,14 +1519,14 @@ public class SenderTest {
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
 
         // Send the third ProduceRequest, in parallel with the second. It should be retried even though the
         // lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back.
         Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
 
@@ -1536,7 +1536,7 @@ public class SenderTest {
 
 
         sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
-        sender.run(time.milliseconds()); // receive response 2, should reset the sequence numbers and be retried.
+        sender.runOnce(); // receive response 2, should reset the sequence numbers and be retried.
 
         // We should have reset the sequence number state of the partition because the state was lost on the broker.
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -1545,20 +1545,20 @@ public class SenderTest {
         assertFalse(request3.isDone());
         assertEquals(1, client.inFlightRequestCount());
 
-        sender.run(time.milliseconds()); // resend request 2.
+        sender.runOnce(); // resend request 2.
 
         assertEquals(2, client.inFlightRequestCount());
 
         // receive the original response 3. note the expected sequence is still the originally assigned sequence.
         sendIdempotentProducerResponse(2, tp0, Errors.UNKNOWN_PRODUCER_ID, -1, 1010L);
-        sender.run(time.milliseconds()); // receive response 3
+        sender.runOnce(); // receive response 3
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
-        sender.run(time.milliseconds());  // receive response 2, don't send request 3 since we can have at most 1 in flight when retrying
+        sender.runOnce();  // receive response 2, don't send request 3 since we can have at most 1 in flight when retrying
 
         assertTrue(request2.isDone());
         assertFalse(request3.isDone());
@@ -1567,11 +1567,11 @@ public class SenderTest {
         assertEquals(1011L, request2.get().offset());
         assertEquals(1011L, transactionManager.lastAckedOffset(tp0));
 
-        sender.run(time.milliseconds());  // resend request 3.
+        sender.runOnce();  // resend request 3.
         assertEquals(1, client.inFlightRequestCount());
 
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1012L, 1010L);
-        sender.run(time.milliseconds());  // receive response 3.
+        sender.runOnce();  // receive response 3.
 
         assertFalse(client.hasInFlightRequests());
         assertTrue(request3.isDone());
@@ -1591,7 +1591,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
@@ -1599,7 +1599,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
 
-        sender.run(time.milliseconds());  // receive the response.
+        sender.runOnce();  // receive the response.
 
         assertTrue(request1.isDone());
         assertEquals(1000L, request1.get().offset());
@@ -1608,14 +1608,14 @@ public class SenderTest {
 
         // Send second ProduceRequest,
         Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
 
         sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
-        sender.run(time.milliseconds()); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset
+        sender.runOnce(); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset
         assertFutureFailure(request2, OutOfOrderSequenceException.class);
 
     }
@@ -1659,7 +1659,7 @@ public class SenderTest {
             }
         }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFutureFailure(future, ClusterAuthorizationException.class);
 
         // cluster authorization errors are fatal, so we should continue seeing it on future sends
@@ -1679,11 +1679,11 @@ public class SenderTest {
         // cluster authorization is a fatal error for the producer
         Future<RecordMetadata> future1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
                 null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         Future<RecordMetadata> future2 = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(),
                 null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         client.respond(new MockClient.RequestMatcher() {
             @Override
@@ -1692,11 +1692,11 @@ public class SenderTest {
             }
         }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.hasFatalError());
         assertFutureFailure(future1, ClusterAuthorizationException.class);
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFutureFailure(future2, ClusterAuthorizationException.class);
 
         // Should be fine if the second response eventually returns
@@ -1706,7 +1706,7 @@ public class SenderTest {
                 return body instanceof ProduceRequest && ((ProduceRequest) body).hasIdempotentRecords();
             }
         }, produceResponse(tp1, 0, Errors.NONE, 0));
-        sender.run(time.milliseconds());
+        sender.runOnce();
     }
 
     @Test
@@ -1727,7 +1727,7 @@ public class SenderTest {
             }
         }, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFutureFailure(future, UnsupportedForMessageFormatException.class);
 
         // unsupported for message format is not a fatal error
@@ -1752,7 +1752,7 @@ public class SenderTest {
             }
         });
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFutureFailure(future, UnsupportedVersionException.class);
 
         // unsupported version errors are fatal, so we should continue seeing it on future sends
@@ -1794,10 +1794,10 @@ public class SenderTest {
             }
         }, produceResponse(tp0, 0, Errors.NONE, 0));
 
-        sender.run(time.milliseconds());  // connect.
-        sender.run(time.milliseconds());  // send.
+        sender.runOnce();  // connect.
+        sender.runOnce();  // send.
 
-        sender.run(time.milliseconds());  // receive response
+        sender.runOnce();  // receive response
         assertTrue(responseFuture.isDone());
         assertEquals(0L, (long) transactionManager.lastAckedSequence(tp0));
         assertEquals(1L, (long) transactionManager.sequenceNumber(tp0));
@@ -1818,21 +1818,21 @@ public class SenderTest {
                 senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // connect.
-        sender.run(time.milliseconds());  // send.
+        sender.runOnce();  // connect.
+        sender.runOnce();  // send.
         String id = client.requests().peek().destination();
         Node node = new Node(Integer.valueOf(id), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
-        assertTrue("Client ready status should be true", client.isReady(node, 0L));
+        assertTrue("Client ready status should be true", client.isReady(node, time.milliseconds()));
         client.disconnect(id);
         assertEquals(0, client.inFlightRequestCount());
-        assertFalse("Client ready status should be false", client.isReady(node, 0L));
+        assertFalse("Client ready status should be false", client.isReady(node, time.milliseconds()));
 
         transactionManager.resetProducerId();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
-        sender.run(time.milliseconds()); // receive error
-        sender.run(time.milliseconds()); // reconnect
-        sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
+        sender.runOnce(); // receive error
+        sender.runOnce(); // reconnect
+        sender.runOnce(); // nothing to do, since the pid has changed. We should check the metrics for errors.
         assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
 
         KafkaMetric recordErrors = m.metrics().get(senderMetrics.recordErrorRate);
@@ -1857,15 +1857,15 @@ public class SenderTest {
                 senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // connect.
-        sender.run(time.milliseconds());  // send.
+        sender.runOnce();  // connect.
+        sender.runOnce();  // send.
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
         client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(responseFuture.isDone());
         assertEquals(0, sender.inFlightBatches(tp0).size());
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
@@ -1892,7 +1892,7 @@ public class SenderTest {
         txnManager.beginTransaction();
         txnManager.maybeAddPartitionToTransaction(tp);
         client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
     }
@@ -1923,25 +1923,25 @@ public class SenderTest {
                     accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
             Future<RecordMetadata> f2 =
                     accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
-            sender.run(time.milliseconds()); // connect
-            sender.run(time.milliseconds()); // send produce request
+            sender.runOnce(); // connect
+            sender.runOnce(); // send produce request
 
             assertEquals("The next sequence should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.valueOf(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
-            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+            assertTrue("Client ready status should be true", client.isReady(node, time.milliseconds()));
 
             Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
             responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
             client.respond(new ProduceResponse(responseMap));
-            sender.run(time.milliseconds()); // split and reenqueue
+            sender.runOnce(); // split and reenqueue
             assertEquals("The next sequence should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             // The compression ratio should have been improved once.
             assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
                     CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
-            sender.run(time.milliseconds()); // send the first produce request
+            sender.runOnce(); // send the first produce request
             assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             assertFalse("The future shouldn't have been done.", f1.isDone());
             assertFalse("The future shouldn't have been done.", f2.isDone());
@@ -1949,30 +1949,30 @@ public class SenderTest {
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             node = new Node(Integer.valueOf(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
-            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+            assertTrue("Client ready status should be true", client.isReady(node, time.milliseconds()));
 
             responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
             client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
-            sender.run(time.milliseconds()); // receive
+            sender.runOnce(); // receive
             assertTrue("The future should have been done.", f1.isDone());
             assertEquals("The next sequence number should still be 2", 2, txnManager.sequenceNumber(tp).longValue());
             assertEquals("The last ack'd sequence number should be 0", 0, txnManager.lastAckedSequence(tp));
             assertFalse("The future shouldn't have been done.", f2.isDone());
             assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
-            sender.run(time.milliseconds()); // send the seconcd produce request
+            sender.runOnce(); // send the seconcd produce request
             id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             node = new Node(Integer.valueOf(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
-            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+            assertTrue("Client ready status should be true", client.isReady(node, time.milliseconds()));
 
             responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L));
             client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
-            sender.run(time.milliseconds()); // receive
+            sender.runOnce(); // receive
             assertTrue("The future should have been done.", f2.isDone());
             assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
@@ -1993,19 +1993,19 @@ public class SenderTest {
         // Send first ProduceRequest
         Future<RecordMetadata> request1 =
             accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
         time.sleep(deliverTimeoutMs);
         assertFalse(pool.allMatch());
 
-        sender.run(time.milliseconds());  // expire the batch
+        sender.runOnce();  // expire the batch
         assertTrue(request1.isDone());
         assertTrue("The batch should have been de-allocated", pool.allMatch());
         assertTrue(pool.allMatch());
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue("The batch should have been de-allocated", pool.allMatch());
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
@@ -2018,7 +2018,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         assertEquals(1, client.inFlightRequestCount());
         assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size());
 
@@ -2027,7 +2027,7 @@ public class SenderTest {
         client.respond(new ProduceResponse(responseMap));
 
         time.sleep(deliveryTimeoutMs);
-        sender.run(time.milliseconds());  // receive first response
+        sender.runOnce();  // receive first response
         assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size());
         try {
             request.get();
@@ -2044,7 +2044,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
@@ -2052,18 +2052,18 @@ public class SenderTest {
 
         // Send second ProduceRequest
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
-        sender.run(time.milliseconds());  // must not send request because the partition is muted
+        sender.runOnce();  // must not send request because the partition is muted
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
         time.sleep(deliveryTimeoutMs / 2); // expire the first batch only
 
         client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L));
-        sender.run(time.milliseconds());  // receive response (offset=0)
+        sender.runOnce();  // receive response (offset=0)
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
 
-        sender.run(time.milliseconds());  // Drain the second request only this time
+        sender.runOnce();  // Drain the second request only this time
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
     }
@@ -2077,7 +2077,7 @@ public class SenderTest {
         Future<RecordMetadata> request1 =
             accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
                 MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         assertEquals(1, client.inFlightRequestCount());
         time.sleep(deliverTimeoutMs);
 
@@ -2085,16 +2085,16 @@ public class SenderTest {
         responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
         client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1)); // return a retriable error
 
-        sender.run(time.milliseconds());  // expire the batch
+        sender.runOnce();  // expire the batch
         assertTrue(request1.isDone());
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
 
-        sender.run(time.milliseconds()); // receive first response and do not reenqueue.
+        sender.runOnce(); // receive first response and do not reenqueue.
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
 
-        sender.run(time.milliseconds()); // run again and must not send anything.
+        sender.runOnce(); // run again and must not send anything.
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
     }
@@ -2110,21 +2110,21 @@ public class SenderTest {
             accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
                 MAX_BLOCK_TIMEOUT).future;
 
-        sender.run(time.milliseconds());  // send request
+        sender.runOnce();  // send request
         assertEquals(1, client.inFlightRequestCount());
         // return a MESSAGE_TOO_LARGE error
         client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
 
         time.sleep(deliverTimeoutMs);
         // expire the batch and process the response
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(request1.isDone());
         assertTrue(request2.isDone());
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
 
         // run again and must not split big batch and resend anything.
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
     }
@@ -2138,10 +2138,10 @@ public class SenderTest {
         accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null,
                 MAX_BLOCK_TIMEOUT);
 
-        sender.run(time.milliseconds());
-        sender.run(time.milliseconds());
+        sender.runOnce();
+        sender.runOnce();
         time.setCurrentTimeMs(time.milliseconds() + accumulator.getDeliveryTimeoutMs() + 1);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         InOrder inOrder = inOrder(client);
         inOrder.verify(client, atLeastOnce()).ready(any(), anyLong());
@@ -2164,7 +2164,7 @@ public class SenderTest {
         Future<RecordMetadata> request2 = accumulator.append(tp1, time.milliseconds(), "k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
 
         // Send request.
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(1, client.inFlightRequestCount());
         assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size());
 
@@ -2174,7 +2174,7 @@ public class SenderTest {
 
         // Successfully expire both batches.
         time.sleep(deliveryTimeoutMs);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size());
 
         try {
@@ -2304,7 +2304,7 @@ public class SenderTest {
     private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
         Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
                 null, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(future.isDone());
         try {
             future.get();
@@ -2325,17 +2325,17 @@ public class SenderTest {
                 return body instanceof InitProducerIdRequest && ((InitProducerIdRequest) body).transactionalId() == null;
             }
         }, new InitProducerIdResponse(0, error, producerId, producerEpoch));
-        sender.run(time.milliseconds());
+        sender.runOnce();
     }
 
     private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE);
-        sender.run(time.milliseconds());
-        sender.run(time.milliseconds());
+        sender.runOnce();
+        sender.runOnce();
 
-        prepareInitPidResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
-        sender.run(time.milliseconds());
+        prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
+        sender.runOnce();
         assertTrue(transactionManager.hasProducerId());
     }
 
@@ -2343,8 +2343,8 @@ public class SenderTest {
         client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
     }
 
-    private void prepareInitPidResponse(Errors error, long pid, short epoch) {
-        client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
+    private void prepareInitProducerResponse(Errors error, long producerId, short producerEpoch) {
+        client.prepareResponse(new InitProducerIdResponse(0, error, producerId, producerEpoch));
     }
 
     private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType)
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 92ae5f0..d36ba27 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -163,7 +163,7 @@ public class TransactionManagerTest {
 
         transactionManager.maybeAddPartitionToTransaction(tp0);
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
 
         transactionManager.beginCommit();
@@ -233,13 +233,13 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareAddPartitionsToTxn(partition, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         transactionManager.beginAbort();
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(transactionManager.hasOngoingTransaction());
     }
 
@@ -260,13 +260,13 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareAddPartitionsToTxn(partition, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         transactionManager.beginCommit();
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(transactionManager.hasOngoingTransaction());
     }
 
@@ -287,7 +287,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareAddPartitionsToTxn(partition, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         transactionManager.transitionToAbortableError(new KafkaException());
         assertTrue(transactionManager.hasOngoingTransaction());
@@ -296,7 +296,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(transactionManager.hasOngoingTransaction());
     }
 
@@ -317,7 +317,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareAddPartitionsToTxn(partition, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         transactionManager.transitionToFatalError(new KafkaException());
         assertFalse(transactionManager.hasOngoingTransaction());
@@ -337,7 +337,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isPartitionPendingAdd(partition));
 
         prepareAddPartitionsToTxn(partition, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertFalse(transactionManager.hasPartitionsToAdd());
         assertTrue(transactionManager.isPartitionAdded(partition));
@@ -364,7 +364,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isPartitionPendingAdd(partition));
 
         prepareAddPartitionsToTxn(partition, Errors.CONCURRENT_TRANSACTIONS);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         TransactionManager.TxnRequestHandler handler = transactionManager.nextRequestHandler(false);
         assertNotNull(handler);
@@ -385,7 +385,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isPartitionPendingAdd(partition));
 
         prepareAddPartitionsToTxn(partition, Errors.COORDINATOR_NOT_AVAILABLE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         TransactionManager.TxnRequestHandler handler = transactionManager.nextRequestHandler(false);
         assertNotNull(handler);
@@ -406,7 +406,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isPartitionPendingAdd(partition));
 
         prepareAddPartitionsToTxn(partition, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(partition));
 
         TopicPartition otherPartition = new TopicPartition("foo", 1);
@@ -476,7 +476,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         // Send the AddPartitionsToTxn request and leave it in-flight
-        sender.run(time.milliseconds());
+        sender.runOnce();
         transactionManager.transitionToAbortableError(new KafkaException());
 
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
@@ -509,7 +509,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         // Send the AddPartitionsToTxn request and leave it in-flight
-        sender.run(time.milliseconds());
+        sender.runOnce();
         transactionManager.transitionToFatalError(new KafkaException());
 
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
@@ -527,7 +527,7 @@ public class TransactionManagerTest {
 
         transactionManager.maybeAddPartitionToTransaction(tp0);
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(transactionManager.hasPartitionsToAdd());
         transactionManager.transitionToAbortableError(new KafkaException());
 
@@ -545,7 +545,7 @@ public class TransactionManagerTest {
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(transactionManager.hasPartitionsToAdd());
         transactionManager.transitionToFatalError(new KafkaException());
 
@@ -617,13 +617,13 @@ public class TransactionManagerTest {
         prepareProduceResponse(Errors.NONE, pid, epoch);
         assertFalse(transactionManager.transactionContainsPartition(tp0));
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
-        sender.run(time.milliseconds());  // send addPartitions.
+        sender.runOnce();  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
         assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
         assertFalse(responseFuture.isDone());
 
-        sender.run(time.milliseconds());  // send produce request.
+        sender.runOnce();  // send produce request.
         assertTrue(responseFuture.isDone());
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
@@ -635,7 +635,7 @@ public class TransactionManagerTest {
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
 
-        sender.run(time.milliseconds());  // Send AddOffsetsRequest
+        sender.runOnce();  // Send AddOffsetsRequest
         assertTrue(transactionManager.hasPendingOffsetCommits());  // We should now have created and queued the offset commit request.
         assertFalse(addOffsetsResult.isCompleted()); // the result doesn't complete until TxnOffsetCommit returns
 
@@ -646,19 +646,19 @@ public class TransactionManagerTest {
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
 
         assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
-        sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
-        sender.run(time.milliseconds());  // send find coordinator for group request
+        sender.runOnce();  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
+        sender.runOnce();  // send find coordinator for group request
         assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
         assertTrue(transactionManager.hasPendingOffsetCommits());
 
-        sender.run(time.milliseconds());  // send TxnOffsetCommitRequest commit.
+        sender.runOnce();  // send TxnOffsetCommitRequest commit.
 
         assertFalse(transactionManager.hasPendingOffsetCommits());
         assertTrue(addOffsetsResult.isCompleted());  // We should only be done after both RPCs complete.
 
         transactionManager.beginCommit();
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        sender.run(time.milliseconds());  // commit.
+        sender.runOnce();  // commit.
 
         assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
@@ -671,11 +671,11 @@ public class TransactionManagerTest {
         // It finds the coordinator and then gets a PID.
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, true, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // find coordinator, connection lost.
+        sender.runOnce();  // find coordinator, connection lost.
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
+        sender.runOnce();  // find coordinator
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
     }
 
@@ -689,8 +689,8 @@ public class TransactionManagerTest {
             return true;
         });
 
-        sender.run(time.milliseconds()); // InitProducerRequest is queued
-        sender.run(time.milliseconds()); // FindCoordinator is queued after peeking InitProducerRequest
+        sender.runOnce(); // InitProducerRequest is queued
+        sender.runOnce(); // FindCoordinator is queued after peeking InitProducerRequest
         assertTrue(transactionManager.hasFatalError());
         assertTrue(transactionManager.lastError() instanceof UnsupportedVersionException);
     }
@@ -699,8 +699,8 @@ public class TransactionManagerTest {
     public void testUnsupportedInitTransactions() {
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds()); // InitProducerRequest is queued
-        sender.run(time.milliseconds()); // FindCoordinator is queued after peeking InitProducerRequest
+        sender.runOnce(); // InitProducerRequest is queued
+        sender.runOnce(); // FindCoordinator is queued after peeking InitProducerRequest
 
         assertFalse(transactionManager.hasError());
         assertNotNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -712,7 +712,7 @@ public class TransactionManagerTest {
             return true;
         });
 
-        sender.run(time.milliseconds()); // InitProducerRequest is dequeued
+        sender.runOnce(); // InitProducerRequest is dequeued
         assertTrue(transactionManager.hasFatalError());
         assertTrue(transactionManager.lastError() instanceof UnsupportedVersionException);
     }
@@ -731,14 +731,14 @@ public class TransactionManagerTest {
                 singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
-        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
-        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+        sender.runOnce();  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
+        sender.runOnce();  // FindCoordinator Enqueued
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
-        sender.run(time.milliseconds());  // FindCoordinator Returned
+        sender.runOnce();  // FindCoordinator Returned
 
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
-        sender.run(time.milliseconds());  // TxnOffsetCommit Handled
+        sender.runOnce();  // TxnOffsetCommit Handled
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof UnsupportedForMessageFormatException);
@@ -756,24 +756,24 @@ public class TransactionManagerTest {
         final short epoch = 1;
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
+        sender.runOnce();  // find coordinator
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
         prepareInitPidResponse(Errors.NONE, true, pid, epoch);
         // send pid to coordinator, should get disconnected before receiving the response, and resend the
         // FindCoordinator and InitPid requests.
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-        sender.run(time.milliseconds());  // get pid and epoch
+        sender.runOnce();  // get pid and epoch
 
         assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
         assertTrue(transactionManager.hasProducerId());
@@ -789,15 +789,15 @@ public class TransactionManagerTest {
         final short epoch = 1;
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // one loop to realize we need a coordinator.
-        sender.run(time.milliseconds());  // next loop to find coordintor.
+        sender.runOnce();  // one loop to realize we need a coordinator.
+        sender.runOnce();  // next loop to find coordintor.
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
 
         client.disconnect(brokerNode.idString());
         client.blackout(brokerNode, 100);
         // send pid to coordinator. Should get disconnected before the send and resend the FindCoordinator
         // and InitPid requests.
-        sender.run(time.milliseconds());
+        sender.runOnce();
         time.sleep(110);  // waiting for the blackout period for the node to expire.
 
         assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -805,11 +805,11 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-        sender.run(time.milliseconds());  // get pid and epoch
+        sender.runOnce();  // get pid and epoch
 
         assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
         assertTrue(transactionManager.hasProducerId());
@@ -825,23 +825,23 @@ public class TransactionManagerTest {
         final short epoch = 1;
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
+        sender.runOnce();  // find coordinator
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NOT_COORDINATOR, false, pid, epoch);
-        sender.run(time.milliseconds());  // send pid, get not coordinator. Should resend the FindCoordinator and InitPid requests
+        sender.runOnce();  // send pid, get not coordinator. Should resend the FindCoordinator and InitPid requests
 
         assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-        sender.run(time.milliseconds());  // get pid and epoch
+        sender.runOnce();  // get pid and epoch
 
         assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
         assertTrue(transactionManager.hasProducerId());
@@ -854,13 +854,13 @@ public class TransactionManagerTest {
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false,
                 CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
+        sender.runOnce();  // find coordinator
+        sender.runOnce();
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
 
-        sender.run(time.milliseconds()); // one more run to fail the InitProducerId future
+        sender.runOnce(); // one more run to fail the InitProducerId future
         assertTrue(initPidResult.isCompleted());
         assertFalse(initPidResult.isSuccessful());
         assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException);
@@ -873,12 +873,12 @@ public class TransactionManagerTest {
         final long pid = 13131L;
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
+        sender.runOnce();  // find coordinator
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, pid, RecordBatch.NO_PRODUCER_EPOCH);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(transactionManager.hasError());
         assertTrue(initPidResult.isCompleted());
@@ -901,12 +901,12 @@ public class TransactionManagerTest {
                 singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), consumerGroupId);
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
-        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
-        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+        sender.runOnce();  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
+        sender.runOnce();  // FindCoordinator Enqueued
 
         prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId);
-        sender.run(time.milliseconds());  // FindCoordinator Failed
-        sender.run(time.milliseconds());  // TxnOffsetCommit Aborted
+        sender.runOnce();  // FindCoordinator Failed
+        sender.runOnce();  // TxnOffsetCommit Aborted
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
         assertTrue(sendOffsetsResult.isCompleted());
@@ -933,14 +933,14 @@ public class TransactionManagerTest {
                 singletonMap(tp1, new OffsetAndMetadata(39L)), consumerGroupId);
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
-        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
-        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+        sender.runOnce();  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
+        sender.runOnce();  // FindCoordinator Enqueued
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
-        sender.run(time.milliseconds());  // FindCoordinator Returned
+        sender.runOnce();  // FindCoordinator Returned
 
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp1, Errors.GROUP_AUTHORIZATION_FAILED));
-        sender.run(time.milliseconds());  // TxnOffsetCommit Handled
+        sender.runOnce();  // TxnOffsetCommit Handled
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
@@ -969,7 +969,7 @@ public class TransactionManagerTest {
                 singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
 
         prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
-        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled
+        sender.runOnce();  // AddOffsetsToTxn Handled
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
@@ -994,14 +994,14 @@ public class TransactionManagerTest {
                 singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
-        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
-        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+        sender.runOnce();  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
+        sender.runOnce();  // FindCoordinator Enqueued
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
-        sender.run(time.milliseconds());  // FindCoordinator Returned
+        sender.runOnce();  // FindCoordinator Returned
 
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
-        sender.run(time.milliseconds());  // TxnOffsetCommit Handled
+        sender.runOnce();  // TxnOffsetCommit Handled
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
@@ -1029,7 +1029,7 @@ public class TransactionManagerTest {
         errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
 
         prepareAddPartitionsToTxn(errors);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException);
@@ -1060,16 +1060,16 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(transactionManager.hasAbortableError());
         transactionManager.beginAbort();
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(responseFuture.isDone());
         assertFutureFailed(responseFuture);
 
         // No partitions added, so no need to prepare EndTxn response
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isReady());
         assertFalse(transactionManager.hasPartitionsToAdd());
         assertFalse(accumulator.hasIncomplete());
@@ -1083,19 +1083,19 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.hasPartitionsToAdd());
 
         transactionManager.beginCommit();
         prepareProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(responseFuture.isDone());
         assertNotNull(responseFuture.get());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(transactionManager.isReady());
     }
@@ -1114,14 +1114,14 @@ public class TransactionManagerTest {
 
         Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
                 "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
 
         transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
         Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
                 "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
         prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.hasAbortableError());
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
@@ -1130,7 +1130,7 @@ public class TransactionManagerTest {
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
         transactionManager.beginAbort();
-        sender.run(time.milliseconds());
+        sender.runOnce();
         // neither produce request has been sent, so they should both be failed immediately
         assertFutureFailed(authorizedTopicProduceFuture);
         assertFutureFailed(unauthorizedTopicProduceFuture);
@@ -1147,19 +1147,19 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.hasPartitionsToAdd());
 
         transactionManager.beginCommit();
         prepareProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(nextTransactionFuture.isDone());
         assertNotNull(nextTransactionFuture.get());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(transactionManager.isReady());
     }
@@ -1178,12 +1178,12 @@ public class TransactionManagerTest {
 
         Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(),
                 "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
 
         accumulator.beginFlush();
         prepareProduceResponse(Errors.REQUEST_TIMED_OUT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(authorizedTopicProduceFuture.isDone());
         assertTrue(accumulator.hasIncomplete());
 
@@ -1191,14 +1191,14 @@ public class TransactionManagerTest {
         Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
                 "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
         prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.hasAbortableError());
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
         assertFalse(authorizedTopicProduceFuture.isDone());
 
         prepareProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFutureFailed(unauthorizedTopicProduceFuture);
         assertTrue(authorizedTopicProduceFuture.isDone());
         assertNotNull(authorizedTopicProduceFuture.get());
@@ -1206,7 +1206,7 @@ public class TransactionManagerTest {
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
         transactionManager.beginAbort();
-        sender.run(time.milliseconds());
+        sender.runOnce();
         // neither produce request has been sent, so they should both be failed immediately
         assertTrue(transactionManager.isReady());
         assertFalse(transactionManager.hasPartitionsToAdd());
@@ -1221,19 +1221,19 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.hasPartitionsToAdd());
 
         transactionManager.beginCommit();
         prepareProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(nextTransactionFuture.isDone());
         assertNotNull(nextTransactionFuture.get());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(transactionManager.isReady());
     }
@@ -1250,7 +1250,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp);
 
         prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
@@ -1283,13 +1283,13 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.transactionContainsPartition(tp0));
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
 
-        sender.run(time.milliseconds());  // AddPartitions.
+        sender.runOnce();  // AddPartitions.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
         assertFalse(responseFuture.isDone());
         assertFalse(commitResult.isCompleted());
 
         prepareProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());  // Produce.
+        sender.runOnce();  // Produce.
         assertTrue(responseFuture.isDone());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
@@ -1297,7 +1297,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasOngoingTransaction());
         assertTrue(transactionManager.isCompleting());
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(commitResult.isCompleted());
         assertFalse(transactionManager.hasOngoingTransaction());
     }
@@ -1322,7 +1322,7 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.transactionContainsPartition(tp0));
 
         // Sender flushes one add partitions. The produce goes next.
-        sender.run(time.milliseconds());  // send addPartitions.
+        sender.runOnce();  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
 
@@ -1340,14 +1340,14 @@ public class TransactionManagerTest {
         assertFalse(secondResponseFuture.isDone());
 
         // The second add partitions should go out here.
-        sender.run(time.milliseconds());  // send second add partitions request
+        sender.runOnce();  // send second add partitions request
         assertTrue(transactionManager.transactionContainsPartition(tp1));
 
         assertFalse(responseFuture.isDone());
         assertFalse(secondResponseFuture.isDone());
 
         // Finally we get to the produce.
-        sender.run(time.milliseconds());  // send produce request
+        sender.runOnce();  // send produce request
 
         assertTrue(responseFuture.isDone());
         assertTrue(secondResponseFuture.isDone());
@@ -1369,9 +1369,9 @@ public class TransactionManagerTest {
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, pid, epoch);
-        sender.run(time.milliseconds()); // Add partitions.
+        sender.runOnce(); // Add partitions.
 
-        sender.run(time.milliseconds());  // send produce.
+        sender.runOnce();  // send produce.
 
         assertTrue(responseFuture.isDone());
         assertTrue(transactionManager.hasError());
@@ -1396,11 +1396,11 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
 
-        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        sender.runOnce();  // Send AddPartitionsRequest
         assertFalse(commitResult.isCompleted());
-        sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.
+        sender.runOnce();  // Send Produce Request, returns OutOfOrderSequenceException.
 
-        sender.run(time.milliseconds());  // try to commit.
+        sender.runOnce();  // try to commit.
         assertTrue(commitResult.isCompleted());  // commit should be cancelled with exception without being sent.
 
         try {
@@ -1420,7 +1420,7 @@ public class TransactionManagerTest {
         // Commit is not allowed, so let's abort and try again.
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
-        sender.run(time.milliseconds());  // Send abort request. It is valid to transition from ERROR to ABORT
+        sender.runOnce();  // Send abort request. It is valid to transition from ERROR to ABORT
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
@@ -1445,11 +1445,11 @@ public class TransactionManagerTest {
         prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
 
-        sender.run(time.milliseconds());  // Send AddPartitionsRequest
-        sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.
+        sender.runOnce();  // Send AddPartitionsRequest
+        sender.runOnce();  // Send Produce Request, returns OutOfOrderSequenceException.
 
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
-        sender.run(time.milliseconds());  // try to abort
+        sender.runOnce();  // try to abort
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
         assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
@@ -1471,8 +1471,8 @@ public class TransactionManagerTest {
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
 
-        sender.run(time.milliseconds());  // Send AddPartitionsRequest
-        sender.run(time.milliseconds());  // Send Produce Request
+        sender.runOnce();  // Send AddPartitionsRequest
+        sender.runOnce();  // Send Produce Request
 
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
         assertTrue(transactionManager.isAborting());
@@ -1480,13 +1480,13 @@ public class TransactionManagerTest {
 
         sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
-        sender.run(time.milliseconds());  // receive the produce response
+        sender.runOnce();  // receive the produce response
 
         // we do not transition to ABORTABLE_ERROR since we were already aborting
         assertTrue(transactionManager.isAborting());
         assertFalse(transactionManager.hasError());
 
-        sender.run(time.milliseconds());  // handle the abort
+        sender.runOnce();  // handle the abort
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
         assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
@@ -1506,19 +1506,19 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(accumulator.hasUndrained());
 
         // committing the transaction should cause the unsent batch to be flushed
         transactionManager.beginCommit();
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // until the produce future returns, we will not send EndTxn
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
@@ -1526,17 +1526,17 @@ public class TransactionManagerTest {
 
         // now the produce response returns
         sendProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(responseFuture.isDone());
         assertFalse(accumulator.hasUndrained());
         assertFalse(accumulator.hasIncomplete());
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we send EndTxn
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.hasInFlightTransactionalRequest());
         sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertTrue(transactionManager.isReady());
     }
@@ -1555,25 +1555,25 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(accumulator.hasUndrained());
 
         accumulator.beginFlush();
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we begin the commit with the produce request still pending
         transactionManager.beginCommit();
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // until the produce future returns, we will not send EndTxn
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
@@ -1581,17 +1581,17 @@ public class TransactionManagerTest {
 
         // now the produce response returns
         sendProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(responseFuture.isDone());
         assertFalse(accumulator.hasUndrained());
         assertFalse(accumulator.hasIncomplete());
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we send EndTxn
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.hasInFlightTransactionalRequest());
         sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertTrue(transactionManager.isReady());
     }
@@ -1610,16 +1610,16 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
-        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        sender.runOnce();  // Send AddPartitionsRequest
 
         transactionManager.transitionToAbortableError(new KafkaException());
         sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, pid);
-        sender.run(time.milliseconds()); // AddPartitions returns
+        sender.runOnce(); // AddPartitions returns
         assertTrue(transactionManager.hasAbortableError());
 
         assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds()); // FindCoordinator handled
+        sender.runOnce(); // FindCoordinator handled
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertTrue(transactionManager.hasAbortableError());
     }
@@ -1642,7 +1642,7 @@ public class TransactionManagerTest {
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
         // note since no partitions were added to the transaction, no EndTxn will be sent
 
-        sender.run(time.milliseconds());  // try to abort
+        sender.runOnce();  // try to abort
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
         assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
@@ -1669,7 +1669,7 @@ public class TransactionManagerTest {
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
-        sender.run(time.milliseconds());  // Send AddPartitions and let it fail
+        sender.runOnce();  // Send AddPartitions and let it fail
         assertFalse(responseFuture.isDone());
 
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
@@ -1678,8 +1678,8 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
 
-        sender.run(time.milliseconds());  // Resend AddPartitions
-        sender.run(time.milliseconds());  // Send EndTxn
+        sender.runOnce();  // Resend AddPartitions
+        sender.runOnce();  // Send EndTxn
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
@@ -1708,8 +1708,8 @@ public class TransactionManagerTest {
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
-        sender.run(time.milliseconds());  // Send AddPartitions
-        sender.run(time.milliseconds());  // Send ProduceRequest and let it fail
+        sender.runOnce();  // Send AddPartitions
+        sender.runOnce();  // Send ProduceRequest and let it fail
 
         assertFalse(responseFuture.isDone());
 
@@ -1719,8 +1719,8 @@ public class TransactionManagerTest {
         prepareProduceResponse(Errors.NONE, producerId, producerEpoch);
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
 
-        sender.run(time.milliseconds());  // Resend ProduceRequest
-        sender.run(time.milliseconds());  // Send EndTxn
+        sender.runOnce();  // Resend ProduceRequest
+        sender.runOnce();  // Send EndTxn
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
@@ -1746,15 +1746,15 @@ public class TransactionManagerTest {
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid);
 
-        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        sender.runOnce();  // Send AddPartitionsRequest
         assertFalse(transactionManager.transactionContainsPartition(tp0));  // The partition should not yet be added.
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());  // Send AddPartitionsRequest successfully.
+        sender.runOnce();  // Send AddPartitionsRequest successfully.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        sender.run(time.milliseconds());  // Send ProduceRequest.
+        sender.runOnce();  // Send ProduceRequest.
         assertTrue(responseFuture.isDone());
     }
 
@@ -1784,7 +1784,7 @@ public class TransactionManagerTest {
         TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
 
-        sender.run(time.milliseconds());  // send AddOffsetsToTxnResult
+        sender.runOnce();  // send AddOffsetsToTxnResult
 
         assertFalse(addOffsetsResult.isCompleted());  // The request should complete only after the TxnOffsetCommit completes.
 
@@ -1796,19 +1796,19 @@ public class TransactionManagerTest {
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
 
         assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
-        sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
-        sender.run(time.milliseconds());  // send find coordinator for group request
+        sender.runOnce();  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
+        sender.runOnce();  // send find coordinator for group request
         assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
         assertTrue(transactionManager.hasPendingOffsetCommits());
 
-        sender.run(time.milliseconds());  // send TxnOffsetCommitRequest request.
+        sender.runOnce();  // send TxnOffsetCommitRequest request.
 
         assertTrue(transactionManager.hasPendingOffsetCommits());  // The TxnOffsetCommit failed.
         assertFalse(addOffsetsResult.isCompleted());  // We should only be done after both RPCs complete successfully.
 
         txnOffsetCommitResponse.put(tp1, Errors.NONE);
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
-        sender.run(time.milliseconds());  // Send TxnOffsetCommitRequest again.
+        sender.runOnce();  // Send TxnOffsetCommitRequest again.
 
         assertTrue(addOffsetsResult.isCompleted());
         assertTrue(addOffsetsResult.isSuccessful());
@@ -1830,12 +1830,12 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid);
-        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        sender.runOnce();  // Send AddPartitionsRequest
 
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
         assertFalse(abortResult.isCompleted());
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
     }
@@ -1857,10 +1857,10 @@ public class TransactionManagerTest {
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
         prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
-        sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
+        sender.runOnce();  // Send AddOffsetsToTxnRequest
         assertFalse(abortResult.isCompleted());
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(transactionManager.isReady());
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
@@ -1883,10 +1883,10 @@ public class TransactionManagerTest {
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
         prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, consumerGroupId, pid, epoch);
-        sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
+        sender.runOnce();  // Send AddOffsetsToTxnRequest
         assertFalse(abortResult.isCompleted());
 
-        sender.run(time.milliseconds());
+        sender.runOnce();
         assertTrue(abortResult.isCompleted());
         assertFalse(abortResult.isSuccessful());
         assertTrue(transactionManager.hasFatalError());
@@ -1937,13 +1937,13 @@ public class TransactionManagerTest {
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp1);
         prepareAddPartitionsToTxn(tp1, Errors.NONE);
-        sender.run(time.milliseconds());  // Send AddPartitions, tp1 should be in the transaction now.
+        sender.runOnce();  // Send AddPartitions, tp1 should be in the transaction now.
 
         assertTrue(transactionManager.transactionContainsPartition(tp1));
 
         transactionManager.maybeAddPartitionToTransaction(tp0);
         prepareAddPartitionsToTxn(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
-        sender.run(time.milliseconds());  // Send AddPartitions, should be in abortable state.
+        sender.runOnce();  // Send AddPartitions, should be in abortable state.
 
         assertTrue(transactionManager.hasAbortableError());
         assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
@@ -2004,14 +2004,14 @@ public class TransactionManagerTest {
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
-        sender.run(time.milliseconds()); // Add partitions
-        sender.run(time.milliseconds()); // Produce
+        sender.runOnce(); // Add partitions
+        sender.runOnce(); // Produce
 
         assertFalse(responseFuture.isDone());
 
         transactionManager.transitionToAbortableError(new KafkaException());
         prepareProduceResponse(Errors.NONE, pid, epoch);
-        sender.run(time.milliseconds());
+        sender.runOnce();
 
         assertTrue(responseFuture.isDone());
         assertNotNull(responseFuture.get()); // should throw the exception which caused the transaction to be aborted.
@@ -2036,7 +2036,7 @@ public class TransactionManagerTest {
 
         assertFalse(transactionManager.transactionContainsPartition(tp0));
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
-        sender.run(time.milliseconds());  // send addPartitions.
+        sender.runOnce();  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
         assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
@@ -2050,7 +2050,7 @@ public class TransactionManagerTest {
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
-        sender.run(time.milliseconds());  // We should try to flush the produce, but expire it instead without sending anything.
+        sender.runOnce();  // We should try to flush the produce, but expire it instead without sending anything.
         assertTrue(responseFuture.isDone());
 
         try {
@@ -2089,7 +2089,7 @@ public class TransactionManagerTest {
 
         assertFalse(transactionManager.transactionContainsPartition(tp0));
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
-        sender.run(time.milliseconds());  // send addPartitions.
+        sender.runOnce();  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
         assertTrue(transactionManager.transactionContainsPartition(tp1));
@@ -2106,7 +2106,7 @@ public class TransactionManagerTest {
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
-        sender.run(time.milliseconds());  // We should try to flush the produce, but expire it instead without sending anything.
+        sender.runOnce();  // We should try to flush the produce, but expire it instead without sending anything.
         assertTrue(firstBatchResponse.isDone());
         assertTrue(secondBatchResponse.isDone());
 
@@ -2147,7 +2147,7 @@ public class TransactionManagerTest {
 
         assertFalse(transactionManager.transactionContainsPartition(tp0));
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
-        sender.run(time.milliseconds());  // send addPartitions.
+        sender.runOnce();  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
         assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
@@ -2161,9 +2161,8 @@ public class TransactionManagerTest {
         // expire the batch.
         Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
-        client.blackout(clusterNode, 100);
 
-        sender.run(time.milliseconds());  // We should try to flush the produce, but expire it instead without sending anything.
+        sender.runOnce();  // We should try to flush the produce, but expire it instead without sending anything.
         assertTrue(responseFuture.isDone());
 
         try {
@@ -2173,7 +2172,7 @@ public class TransactionManagerTest {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
-        sender.run(time.milliseconds());  // the commit shouldn't be completed without being sent since the produce request failed.
+        sender.runOnce();  // the commit shouldn't be completed without being sent since the produce request failed.
 
         assertTrue(commitResult.isCompleted());
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't succeed since the produce request failed.
@@ -2187,7 +2186,7 @@ public class TransactionManagerTest {
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
 
-        sender.run(time.milliseconds());  // send the abort.
+        sender.runOnce();  // send the abort.
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
@@ -2214,13 +2213,13 @@ public class TransactionManagerTest {
 
         assertFalse(transactionManager.transactionContainsPartition(tp0));
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
-        sender.run(time.milliseconds());  // send addPartitions.
+        sender.runOnce();  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
         assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
 
         prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
-        sender.run(time.milliseconds());  // send the produce request.
+        sender.runOnce();  // send the produce request.
 
         assertFalse(responseFuture.isDone());
 
@@ -2234,7 +2233,7 @@ public class TransactionManagerTest {
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
-        sender.run(time.milliseconds());  // We should try to flush the produce, but expire it instead without sending anything.
+        sender.runOnce();  // We should try to flush the produce, but expire it instead without sending anything.
         assertTrue(responseFuture.isDone());
 
         try {
@@ -2244,8 +2243,8 @@ public class TransactionManagerTest {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
-        sender.run(time.milliseconds());  // Transition to fatal error since we have unresolved batches.
-        sender.run(time.milliseconds());  // Fail the queued transactional requests
+        sender.runOnce();  // Transition to fatal error since we have unresolved batches.
+        sender.runOnce();  // Fail the queued transactional requests
 
         assertTrue(commitResult.isCompleted());
         assertFalse(commitResult.isSuccessful());  // the commit should have been dropped.
@@ -2294,7 +2293,7 @@ public class TransactionManagerTest {
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxn(tp0, error);
-        sender.run(time.milliseconds());  // attempt send addPartitions.
+        sender.runOnce();  // attempt send addPartitions.
         assertTrue(transactionManager.hasError());
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
@@ -2434,12 +2433,12 @@ public class TransactionManagerTest {
     private void doInitTransactions(long pid, short epoch) {
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
+        sender.runOnce();  // find coordinator
+        sender.runOnce();
         assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
 
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-        sender.run(time.milliseconds());  // get pid.
+        sender.runOnce();  // get pid.
         assertTrue(transactionManager.hasProducerId());
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index f87e9a5..ee2cfbf 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -16,13 +16,14 @@
   */
 package kafka.server.epoch.util
 
+import java.net.SocketTimeoutException
 import java.util
 import java.util.Collections
 
 import kafka.cluster.BrokerEndPoint
 import kafka.server.BlockingSend
 import org.apache.kafka.clients.MockClient.MockMetadataUpdater
-import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient, NetworkClientUtils}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.AbstractRequest.Builder
@@ -38,7 +39,11 @@ import org.apache.kafka.common.{Node, TopicPartition}
   * OFFSET_FOR_LEADER_EPOCH with different offsets in response, it should update offsets using
   * setOffsetsForNextResponse
   */
-class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
+class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset],
+                                     sourceBroker: BrokerEndPoint,
+                                     time: Time)
+  extends BlockingSend {
+
   private val client = new MockClient(new SystemTime, new MockMetadataUpdater {
     override def fetchNodes(): util.List[Node] = Collections.emptyList()
     override def isUpdateNeeded: Boolean = false
@@ -49,6 +54,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
   var lastUsedOffsetForLeaderEpochVersion = -1
   var callback: Option[() => Unit] = None
   var currentOffsets: java.util.Map[TopicPartition, EpochEndOffset] = offsets
+  private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
 
   def setEpochRequestCallback(postEpochFunction: () => Unit){
     callback = Some(postEpochFunction)
@@ -59,6 +65,8 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
   }
 
   override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
+    if (!NetworkClientUtils.awaitReady(client, sourceNode, time, 500))
+      throw new SocketTimeoutException(s"Failed to connect within 500 ms")
 
     //Send the request to the mock client
     val clientRequest = request(requestBuilder)
@@ -82,13 +90,13 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
     }
 
     //Use mock client to create the appropriate response object
-    client.respondFrom(response, new Node(destination.id, destination.host, destination.port))
+    client.respondFrom(response, sourceNode)
     client.poll(30, time.milliseconds()).iterator().next()
   }
 
   private def request(requestBuilder: Builder[_ <: AbstractRequest]): ClientRequest = {
     client.newClientRequest(
-      destination.id.toString,
+      sourceBroker.id.toString,
       requestBuilder,
       time.milliseconds(),
       true)