You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/06 12:59:18 UTC

kafka git commit: KAFKA-5364; ensurePartitionAdded does not handle pending partitions in abortable error state [Forced Update!]

Repository: kafka
Updated Branches:
  refs/heads/trunk 3eed19458 -> cb78ba129 (forced update)


KAFKA-5364; ensurePartitionAdded does not handle pending partitions in abortable error state

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3231 from hachikuji/KAFKA-5364


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb78ba12
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb78ba12
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb78ba12

Branch: refs/heads/trunk
Commit: cb78ba1294d1a27b2c4d842a125486ffec593d98
Parents: 313f8d7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Jun 6 13:57:20 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 6 13:58:51 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  16 +-
 .../producer/internals/RecordAccumulator.java   |   4 +-
 .../producer/internals/TransactionManager.java  |  78 +++--
 .../internals/RecordAccumulatorTest.java        |   1 -
 .../clients/producer/internals/SenderTest.java  |   4 +-
 .../internals/TransactionManagerTest.java       | 342 ++++++++++++++++++-
 6 files changed, 381 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1d16721..0f109d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -608,7 +608,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
         if (transactionManager != null)
-            ensureProperTransactionalState();
+            transactionManager.failIfUnreadyForSend();
 
         TopicPartition tp = null;
         try {
@@ -691,20 +691,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private void ensureProperTransactionalState() {
-        if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
-            throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
-                    "when transactions are enabled.");
-
-        if (transactionManager.hasError()) {
-            Exception lastError = transactionManager.lastError();
-            throw new KafkaException("Cannot perform send because at least one previous transactional or " +
-                    "idempotent request has failed with errors.", lastError);
-        }
-        if (transactionManager.isCompletingTransaction())
-            throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");
-    }
-
     private void setReadOnly(Headers headers) {
         if (headers instanceof RecordHeaders) {
             ((RecordHeaders) headers).setReadOnly();

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 2c4917d..0315b13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -469,7 +469,7 @@ public final class RecordAccumulator {
                                         ProducerIdAndEpoch producerIdAndEpoch = null;
                                         boolean isTransactional = false;
                                         if (transactionManager != null) {
-                                            if (!transactionManager.ensurePartitionAdded(tp))
+                                            if (!transactionManager.sendToPartitionAllowed(tp))
                                                 break;
 
                                             producerIdAndEpoch = transactionManager.producerIdAndEpoch();
@@ -477,7 +477,7 @@ public final class RecordAccumulator {
                                                 // we cannot send the batch until we have refreshed the producer id
                                                 break;
 
-                                            isTransactional = transactionManager.isInTransaction();
+                                            isTransactional = transactionManager.isTransactional();
                                         }
 
                                         ProducerBatch batch = deque.pollFirst();

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c081b23..2842cd1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -205,7 +205,7 @@ public class TransactionManager {
     }
 
     public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                                           String consumerGroupId) {
+                                                                            String consumerGroupId) {
         ensureTransactional();
         maybeFailWithError();
         if (currentState != State.IN_TRANSACTION)
@@ -221,34 +221,39 @@ public class TransactionManager {
     }
 
     public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
-        if (!isInTransaction())
-            throw new IllegalArgumentException("Cannot add partitions to a transaction in state " + currentState);
+        if (currentState != State.IN_TRANSACTION)
+            throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState);
 
-        if (partitionsInTransaction.contains(topicPartition))
+        if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition))
             return;
 
         log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
         newPartitionsInTransaction.add(topicPartition);
     }
 
-    public RuntimeException lastError() {
+    RuntimeException lastError() {
         return lastError;
     }
 
-    public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
+    public synchronized void failIfUnreadyForSend() {
+        if (hasError())
+            throw new KafkaException("Cannot perform send because at least one previous transactional or " +
+                    "idempotent request has failed with errors.", lastError);
+
+        if (isTransactional()) {
+            if (!hasProducerId())
+                throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
+                        "when transactions are enabled.");
+
+            if (currentState != State.IN_TRANSACTION)
+                throw new IllegalStateException("Cannot call send in state " + currentState);
+        }
+    }
+
+    synchronized boolean sendToPartitionAllowed(TopicPartition tp) {
         if (hasFatalError())
             return false;
-        if (isInTransaction() || hasAbortableError()) {
-            // We should enter this branch in an error state because if this partition is already in the transaction,
-            // there is a chance that the corresponding batch is in retry. So we must let it completely flush.
-            if (!(partitionsInTransaction.contains(tp) || isPartitionPending(tp))) {
-                transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " +
-                        "for partition " + tp + ", which would never be added to the transaction."));
-                return false;
-            }
-            return partitionsInTransaction.contains(tp);
-        }
-        return true;
+        return !isTransactional() || partitionsInTransaction.contains(tp);
     }
 
     public String transactionalId() {
@@ -263,26 +268,22 @@ public class TransactionManager {
         return transactionalId != null;
     }
 
-    public synchronized boolean hasPartitionsToAdd() {
+    synchronized boolean hasPartitionsToAdd() {
         return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty();
     }
 
-    public synchronized boolean isCompletingTransaction() {
+    synchronized boolean isCompletingTransaction() {
         return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION;
     }
 
-    public synchronized boolean hasError() {
+    synchronized boolean hasError() {
         return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
     }
 
-    public synchronized boolean isAborting() {
+    synchronized boolean isAborting() {
         return currentState == State.ABORTING_TRANSACTION;
     }
 
-    synchronized boolean isInTransaction() {
-        return currentState == State.IN_TRANSACTION || isCompletingTransaction();
-    }
-
     synchronized void transitionToAbortableError(RuntimeException exception) {
         transitionTo(State.ABORTABLE_ERROR, exception);
     }
@@ -291,6 +292,16 @@ public class TransactionManager {
         transitionTo(State.FATAL_ERROR, exception);
     }
 
+    // visible for testing
+    synchronized boolean isPartitionAdded(TopicPartition partition) {
+        return partitionsInTransaction.contains(partition);
+    }
+
+    // visible for testing
+    synchronized boolean isPartitionPendingAdd(TopicPartition partition) {
+        return newPartitionsInTransaction.contains(partition) || pendingPartitionsInTransaction.contains(partition);
+    }
+
     /**
      * Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
      * verify that the result is valid.
@@ -437,21 +448,23 @@ public class TransactionManager {
 
     // visible for testing
     synchronized boolean transactionContainsPartition(TopicPartition topicPartition) {
-        return isInTransaction() && partitionsInTransaction.contains(topicPartition);
+        return partitionsInTransaction.contains(topicPartition);
     }
 
     // visible for testing
     synchronized boolean hasPendingOffsetCommits() {
-        return isInTransaction() && !pendingTxnOffsetCommits.isEmpty();
+        return !pendingTxnOffsetCommits.isEmpty();
     }
 
     // visible for testing
-    synchronized boolean isReady() {
-        return isTransactional() && currentState == State.READY;
+    synchronized boolean hasOngoingTransaction() {
+        // transactions are considered ongoing once started until completion or a fatal error
+        return currentState == State.IN_TRANSACTION || isCompletingTransaction() || hasAbortableError();
     }
 
-    private synchronized boolean isPartitionPending(TopicPartition tp) {
-        return isInTransaction() && (pendingPartitionsInTransaction.contains(tp) || newPartitionsInTransaction.contains(tp));
+    // visible for testing
+    synchronized boolean isReady() {
+        return isTransactional() && currentState == State.READY;
     }
 
     private void transitionTo(State target) {
@@ -472,8 +485,7 @@ public class TransactionManager {
         }
 
         if (lastError != null)
-            log.error("{}Transition from state {} to error state {}", logPrefix, currentState,
-                    target, lastError);
+            log.debug("{}Transition from state {} to error state {}", logPrefix, currentState, target, lastError);
         else
             log.debug("{}Transition from state {} to {}", logPrefix, currentState, target);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 2875ba2..7b9f26b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -394,7 +394,6 @@ public class RecordAccumulatorTest {
         accum.abortIncompleteBatches();
         assertEquals(numExceptionReceivedInCallback.get(), 100);
         assertFalse(accum.hasUnsent());
-
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 927a937..26093d7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -624,7 +624,7 @@ public class SenderTest {
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
             responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
-            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isInTransaction()),
+            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
             sender.run(time.milliseconds()); // receive
@@ -640,7 +640,7 @@ public class SenderTest {
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
             responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
-            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isInTransaction()),
+            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
             sender.run(time.milliseconds()); // receive

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6e633ec..f661baf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -123,6 +123,329 @@ public class TransactionManagerTest {
     }
 
     @Test(expected = IllegalStateException.class)
+    public void testFailIfUnreadyForSendNoProducerId() {
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test
+    public void testFailIfUnreadyForSendIdempotentProducer() {
+        TransactionManager idempotentTransactionManager = new TransactionManager();
+        idempotentTransactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testFailIfUnreadyForSendIdempotentProducerFatalError() {
+        TransactionManager idempotentTransactionManager = new TransactionManager();
+        idempotentTransactionManager.transitionToFatalError(new KafkaException());
+        idempotentTransactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testFailIfUnreadyForSendNoOngoingTransaction() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testFailIfUnreadyForSendAfterAbortableError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        transactionManager.transitionToAbortableError(new KafkaException());
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testFailIfUnreadyForSendAfterFatalError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.transitionToFatalError(new KafkaException());
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test
+    public void testHasOngoingTransactionSuccessfulAbort() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.beginAbortingTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testHasOngoingTransactionSuccessfulCommit() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.beginCommittingTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testHasOngoingTransactionAbortableError() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.transitionToAbortableError(new KafkaException());
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginAbortingTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testHasOngoingTransactionFatalError() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.transitionToFatalError(new KafkaException());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testMaybeAddPartitionToTransaction() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasPartitionsToAdd());
+        assertFalse(transactionManager.isPartitionAdded(partition));
+        assertTrue(transactionManager.isPartitionPendingAdd(partition));
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        assertTrue(transactionManager.isPartitionAdded(partition));
+        assertFalse(transactionManager.isPartitionPendingAdd(partition));
+
+        // adding the partition again should not have any effect
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        assertTrue(transactionManager.isPartitionAdded(partition));
+        assertFalse(transactionManager.isPartitionPendingAdd(partition));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
+        transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionAfterAbortableError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        transactionManager.transitionToAbortableError(new KafkaException());
+        transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionAfterFatalError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.transitionToFatalError(new KafkaException());
+        transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        transactionManager.transitionToAbortableError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        // Send the AddPartitionsToTxn request and leave it in-flight
+        sender.run(time.milliseconds());
+        transactionManager.transitionToAbortableError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        transactionManager.transitionToFatalError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        // Send the AddPartitionsToTxn request and leave it in-flight
+        sender.run(time.milliseconds());
+        transactionManager.transitionToFatalError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        transactionManager.transitionToAbortableError(new KafkaException());
+
+        assertTrue(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        transactionManager.transitionToFatalError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithUnaddedPartition() {
+        final long pid = 13131L;
+        final short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+    }
+
+    @Test(expected = IllegalStateException.class)
     public void testInvalidSequenceIncrement() {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.incrementSequenceNumber(tp0, 3333);
@@ -148,8 +471,6 @@ public class TransactionManagerTest {
 
     @Test
     public void testBasicTransaction() throws InterruptedException {
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -166,11 +487,11 @@ public class TransactionManagerTest {
 
         prepareProduceResponse(Errors.NONE, pid, epoch);
         assertFalse(transactionManager.transactionContainsPartition(tp0));
-        assertFalse(transactionManager.ensurePartitionAdded(tp0));
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
         sender.run(time.milliseconds());  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
-        assertTrue(transactionManager.ensurePartitionAdded(tp0));
+        assertTrue(transactionManager.sendToPartitionAllowed(tp0));
         assertFalse(responseFuture.isDone());
 
         sender.run(time.milliseconds());  // send produce request.
@@ -210,7 +531,7 @@ public class TransactionManagerTest {
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         sender.run(time.milliseconds());  // commit.
 
-        assertFalse(transactionManager.isInTransaction());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompletingTransaction());
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
@@ -507,12 +828,12 @@ public class TransactionManagerTest {
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         assertFalse(commitResult.isCompleted());
-        assertTrue(transactionManager.isInTransaction());
+        assertTrue(transactionManager.hasOngoingTransaction());
         assertTrue(transactionManager.isCompletingTransaction());
 
         sender.run(time.milliseconds());
         assertTrue(commitResult.isCompleted());
-        assertFalse(transactionManager.isInTransaction());
+        assertFalse(transactionManager.hasOngoingTransaction());
     }
 
     @Test
@@ -910,8 +1231,8 @@ public class TransactionManagerTest {
         accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
 
-        assertFalse(transactionManager.ensurePartitionAdded(tp0));
-        assertFalse(transactionManager.ensurePartitionAdded(tp1));
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.sendToPartitionAllowed(tp1));
 
         Node node1 = new Node(0, "localhost", 1111);
         Node node2 = new Node(1, "localhost", 1112);
@@ -951,7 +1272,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // Send AddPartitions, should be in abortable state.
 
         assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.ensurePartitionAdded(tp1));
+        assertTrue(transactionManager.sendToPartitionAllowed(tp1));
 
         // Try to drain a message destined for tp1, it should get drained.
         Node node1 = new Node(1, "localhost", 1112);
@@ -993,7 +1314,6 @@ public class TransactionManagerTest {
         // We shouldn't drain batches which haven't been added to the transaction yet.
         assertTrue(drainedBatches.containsKey(node1.id()));
         assertTrue(drainedBatches.get(node1.id()).isEmpty());
-        assertTrue(transactionManager.hasFatalError());
     }
 
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {