You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 03:54:08 UTC

kafka git commit: KAFKA-5251; Producer should cancel unsent AddPartitions and Produce requests on abort

Repository: kafka
Updated Branches:
  refs/heads/trunk 3250cc767 -> d41cf1b77


KAFKA-5251; Producer should cancel unsent AddPartitions and Produce requests on abort

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3161 from hachikuji/KAFKA-5251


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d41cf1b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d41cf1b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d41cf1b7

Branch: refs/heads/trunk
Commit: d41cf1b77819ede5716b31683d0137eb60cb7bfb
Parents: 3250cc7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue May 30 20:32:51 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 30 20:32:51 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   2 +-
 .../producer/internals/RecordAccumulator.java   |  19 +-
 .../clients/producer/internals/Sender.java      |  66 +++---
 .../producer/internals/TransactionManager.java  |  62 +++---
 .../common/requests/AddOffsetsToTxnRequest.java |   3 +-
 .../requests/AddPartitionsToTxnRequest.java     |   7 +-
 .../kafka/common/requests/EndTxnRequest.java    |   3 +-
 .../kafka/common/requests/FetchRequest.java     |   7 +-
 .../common/requests/TxnOffsetCommitRequest.java |   3 +-
 .../clients/producer/internals/SenderTest.java  |   2 +-
 .../internals/TransactionManagerTest.java       | 209 +++++++++++++------
 .../kafka/api/TransactionsTest.scala            |   1 +
 12 files changed, 259 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4fcbcc8..dc6b911 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -697,7 +697,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
                     "when transactions are enabled.");
 
-        if (transactionManager.isInErrorState()) {
+        if (transactionManager.hasError()) {
             Exception lastError = transactionManager.lastError();
             throw new KafkaException("Cannot perform send because at least one previous transactional or " +
                     "idempotent request has failed with errors.", lastError);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d3d1b82..330c244 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -621,7 +621,6 @@ public final class RecordAccumulator {
     void abortBatches(final RuntimeException reason) {
         for (ProducerBatch batch : incomplete.all()) {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
-            // Close the batch before aborting
             synchronized (dq) {
                 batch.abort();
                 dq.remove(batch);
@@ -631,6 +630,24 @@ public final class RecordAccumulator {
         }
     }
 
+    void abortUnclosedBatches(RuntimeException reason) {
+        for (ProducerBatch batch : incomplete.all()) {
+            Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
+            boolean aborted = false;
+            synchronized (dq) {
+                if (!batch.isClosed()) {
+                    aborted = true;
+                    batch.abort();
+                    dq.remove(batch);
+                }
+            }
+            if (aborted) {
+                batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason);
+                deallocate(batch);
+            }
+        }
+    }
+
     public void mutePartition(TopicPartition tp) {
         muted.add(tp);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3fa5903..f498f7d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -46,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
@@ -204,7 +206,7 @@ public class Sender implements Runnable {
 
             // do not continue sending if the transaction manager is in a failed state or if there
             // is no producer id (for the idempotent case).
-            if (transactionManager.isInErrorState() || !transactionManager.hasProducerId()) {
+            if (transactionManager.hasError() || !transactionManager.hasProducerId()) {
                 RuntimeException lastError = transactionManager.lastError();
                 if (lastError != null)
                     maybeAbortBatches(lastError);
@@ -295,24 +297,38 @@ public class Sender implements Runnable {
     }
 
     private boolean maybeSendTransactionalRequest(long now) {
-        TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
-        if (nextRequestHandler == null) {
-            log.trace("TransactionalId: {} -- There are no pending transactional requests to send",
-                    transactionManager.transactionalId());
-            return false;
-        }
-
-        if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
+        String transactionalId = transactionManager.transactionalId();
+        if (transactionManager.isCompletingTransaction() &&
+                !transactionManager.hasPartitionsToAdd() &&
+                accumulator.hasUnflushedBatches()) {
+
+            // If the transaction is being aborted, then we can clear any unsent produce requests
+            if (transactionManager.isAborting())
+                accumulator.abortUnclosedBatches(new KafkaException("Failing batch since transaction was aborted"));
+
+            // There may still be requests left which are being retried. Since we do not know whether they had
+            // been successfully appended to the broker log, we must resend them until their final status is clear.
+            // If they had been appended and we did not receive the error, then our sequence number would no longer
+            // be correct which would lead to an OutOfSequenceException.
             if (!accumulator.flushInProgress())
                 accumulator.beginFlush();
-            transactionManager.reenqueue(nextRequestHandler);
-            log.trace("TransactionalId: {} -- Going to wait for pending ProducerBatches to flush before sending an " +
-                    "end transaction request", transactionManager.transactionalId());
+
+            // Do not send the EndTxn until all pending batches have been completed
+            if (accumulator.hasUnflushedBatches()) {
+                log.trace("TransactionalId: {} -- Waiting for pending batches to be flushed before completing transaction",
+                        transactionalId);
+                return false;
+            }
+        }
+
+        TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
+        if (nextRequestHandler == null) {
+            log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionalId);
             return false;
         }
 
-        log.debug("TransactionalId: {} -- Sending transactional request {}", transactionManager.transactionalId(),
-                nextRequestHandler.requestBuilder());
+        AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
+        log.trace("TransactionalId: {} -- Preparing to send request {}", transactionalId, requestBuilder);
 
         while (true) {
             Node targetNode = null;
@@ -332,25 +348,25 @@ public class Sender implements Runnable {
                 }
                 if (targetNode != null) {
                     if (nextRequestHandler.isRetry()) {
-                        log.trace("TransactionalId: {} -- Waiting {}ms before resending a transactional request {}",
-                                transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder());
+                        log.trace("TransactionalId: {} -- Waiting {}ms before resending request {}",
+                                transactionalId,
+                                retryBackoffMs, requestBuilder);
                         time.sleep(retryBackoffMs);
                     }
                     ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
-                            nextRequestHandler.requestBuilder(), now, true, nextRequestHandler);
+                            requestBuilder, now, true, nextRequestHandler);
                     transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
-                    log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(),
-                            nextRequestHandler.requestBuilder(), clientRequest.destination());
+                    log.debug("TransactionalId: {} -- Sending transactional request {} to node {}",
+                            transactionalId, requestBuilder, clientRequest.destination());
                     client.send(clientRequest, now);
                     return true;
                 }
             } catch (IOException e) {
-                log.debug("TransactionalId: {} -- Disconnect from {} while trying to send transactional " +
-                                "request {}. Going to back off and retry", transactionManager.transactionalId(),
-                        targetNode, nextRequestHandler.requestBuilder());
+                log.debug("TransactionalId: {} -- Disconnect from {} while trying to send request {}. Going " +
+                                "to back off and retry", transactionalId, targetNode, requestBuilder);
             }
-            log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.",
-                    transactionManager.transactionalId(), retryBackoffMs);
+            log.trace("TransactionalId: {} -- About to wait for {}ms before trying to send another request.",
+                    transactionalId, retryBackoffMs);
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
@@ -402,7 +418,7 @@ public class Sender implements Runnable {
     }
 
     private void maybeWaitForProducerId() {
-        while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) {
+        while (!transactionManager.hasProducerId() && !transactionManager.hasError()) {
             try {
                 Node node = awaitLeastLoadedNodeReady(requestTimeout);
                 if (node != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index ec7ced2..30fff86 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -67,8 +67,8 @@ public class TransactionManager {
 
     private final Map<TopicPartition, Integer> sequenceNumbers;
     private final PriorityQueue<TxnRequestHandler> pendingRequests;
-    private final Set<TopicPartition> newPartitionsToBeAddedToTransaction;
-    private final Set<TopicPartition> pendingPartitionsToBeAddedToTransaction;
+    private final Set<TopicPartition> newPartitionsInTransaction;
+    private final Set<TopicPartition> pendingPartitionsInTransaction;
     private final Set<TopicPartition> partitionsInTransaction;
     private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
 
@@ -139,8 +139,8 @@ public class TransactionManager {
         this.transactionTimeoutMs = transactionTimeoutMs;
         this.transactionCoordinator = null;
         this.consumerGroupCoordinator = null;
-        this.newPartitionsToBeAddedToTransaction = new HashSet<>();
-        this.pendingPartitionsToBeAddedToTransaction = new HashSet<>();
+        this.newPartitionsInTransaction = new HashSet<>();
+        this.pendingPartitionsInTransaction = new HashSet<>();
         this.partitionsInTransaction = new HashSet<>();
         this.pendingTxnOffsetCommits = new HashMap<>();
         this.pendingRequests = new PriorityQueue<>(10, new Comparator<TxnRequestHandler>() {
@@ -184,11 +184,14 @@ public class TransactionManager {
         if (currentState != State.ABORTABLE_ERROR)
             maybeFailWithError();
         transitionTo(State.ABORTING_TRANSACTION);
+
+        // We're aborting the transaction, so there should be no need to add new partitions
+        newPartitionsInTransaction.clear();
         return beginCompletingTransaction(false);
     }
 
     private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
-        if (!newPartitionsToBeAddedToTransaction.isEmpty()) {
+        if (!newPartitionsInTransaction.isEmpty()) {
             pendingRequests.add(addPartitionsToTransactionHandler());
         }
 
@@ -222,7 +225,7 @@ public class TransactionManager {
         if (partitionsInTransaction.contains(topicPartition))
             return;
 
-        newPartitionsToBeAddedToTransaction.add(topicPartition);
+        newPartitionsInTransaction.add(topicPartition);
     }
 
     public RuntimeException lastError() {
@@ -241,14 +244,22 @@ public class TransactionManager {
         return transactionalId != null;
     }
 
+    public synchronized boolean hasPartitionsToAdd() {
+        return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty();
+    }
+
     public synchronized boolean isCompletingTransaction() {
         return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION;
     }
 
-    public synchronized boolean isInErrorState() {
+    public synchronized boolean hasError() {
         return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
     }
 
+    public synchronized boolean isAborting() {
+        return currentState == State.ABORTING_TRANSACTION;
+    }
+
     synchronized boolean isInTransaction() {
         return currentState == State.IN_TRANSACTION || isCompletingTransaction();
     }
@@ -334,7 +345,7 @@ public class TransactionManager {
     }
 
     synchronized TxnRequestHandler nextRequestHandler() {
-        if (!newPartitionsToBeAddedToTransaction.isEmpty())
+        if (!newPartitionsInTransaction.isEmpty())
             pendingRequests.add(addPartitionsToTransactionHandler());
 
         TxnRequestHandler nextRequestHandler = pendingRequests.poll();
@@ -345,15 +356,16 @@ public class TransactionManager {
         }
 
         if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && !transactionStarted) {
-            ((EndTxnHandler) nextRequestHandler).result.done();
+            nextRequestHandler.result.done();
             if (currentState != State.FATAL_ERROR) {
+                log.debug("TransactionId: {} -- Not sending EndTxn for completed transaction since no partitions " +
+                        "or offsets were successfully added", transactionalId);
                 completeTransaction();
             }
             return pendingRequests.poll();
         }
 
         return nextRequestHandler;
-
     }
 
     synchronized void retry(TxnRequestHandler request) {
@@ -403,7 +415,7 @@ public class TransactionManager {
     }
 
     // visible for testing
-    synchronized boolean isReadyForTransaction() {
+    synchronized boolean isReady() {
         return isTransactional() && currentState == State.READY;
     }
 
@@ -434,12 +446,12 @@ public class TransactionManager {
     }
 
     private void maybeFailWithError() {
-        if (isInErrorState())
+        if (hasError())
             throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
     }
 
     private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
-        if (isInErrorState()) {
+        if (hasError()) {
             if (requestHandler instanceof EndTxnHandler) {
                 // we allow abort requests to break out of the error state. The state and the last error
                 // will be cleared when the request returns
@@ -477,10 +489,10 @@ public class TransactionManager {
     }
 
     private synchronized TxnRequestHandler addPartitionsToTransactionHandler() {
-        pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction);
-        newPartitionsToBeAddedToTransaction.clear();
+        pendingPartitionsInTransaction.addAll(newPartitionsInTransaction);
+        newPartitionsInTransaction.clear();
         AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
-                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsInTransaction));
         return new AddPartitionsToTxnHandler(builder);
     }
 
@@ -497,7 +509,7 @@ public class TransactionManager {
         return new TxnOffsetCommitHandler(result, builder);
     }
 
-    abstract class TxnRequestHandler implements  RequestCompletionHandler {
+    abstract class TxnRequestHandler implements RequestCompletionHandler {
         protected final TransactionalRequestResult result;
         private boolean isRetry = false;
 
@@ -658,12 +670,13 @@ public class TransactionManager {
             log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors {}",
                     transactionalId, errors);
 
-            for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) {
-                final Errors error = errors.get(topicPartition);
-                if (error == Errors.NONE || error == null) {
+            for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors.entrySet()) {
+                TopicPartition topicPartition = topicPartitionErrorEntry.getKey();
+                Errors error = topicPartitionErrorEntry.getValue();
+
+                if (error == Errors.NONE) {
                     continue;
-                }
-                if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
+                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                     lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                     reenqueue();
                     return;
@@ -695,8 +708,9 @@ public class TransactionManager {
             } else if (hasPartitionErrors) {
                 abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors"));
             } else {
-                partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
-                pendingPartitionsToBeAddedToTransaction.clear();
+                Set<TopicPartition> addedPartitions = errors.keySet();
+                partitionsInTransaction.addAll(addedPartitions);
+                pendingPartitionsInTransaction.removeAll(addedPartitions);
                 transactionStarted = true;
                 result.done();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 3339470..36b290f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -54,7 +54,8 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
-            bld.append("(transactionalId=").append(transactionalId).
+            bld.append("(type=AddOffsetsToTxnRequest").
+                    append(", transactionalId=").append(transactionalId).
                     append(", producerId=").append(producerId).
                     append(", producerEpoch=").append(producerEpoch).
                     append(", consumerGroupId=").append(consumerGroupId).

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index e24fa5a..6fe034c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -55,10 +55,15 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
             return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
         }
 
+        public List<TopicPartition> partitions() {
+            return partitions;
+        }
+
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
-            bld.append("(transactionalId=").append(transactionalId).
+            bld.append("(type=AddPartitionsToTxnRequest").
+                    append(", transactionalId=").append(transactionalId).
                     append(", producerId=").append(producerId).
                     append(", producerEpoch=").append(producerEpoch).
                     append(", partitions=").append(partitions).

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index b9f052c..01d73b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -54,7 +54,8 @@ public class EndTxnRequest extends AbstractRequest {
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
-            bld.append("(transactionalId=").append(transactionalId).
+            bld.append("(type=EndTxnRequest").
+                    append(", transactionalId=").append(transactionalId).
                     append(", producerId=").append(producerId).
                     append(", producerEpoch=").append(producerEpoch).
                     append(", result=").append(result).

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index fc7d53c..39c027b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -151,7 +151,7 @@ public class FetchRequest extends AbstractRequest {
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
-            bld.append("(type:FetchRequest").
+            bld.append("(type=FetchRequest").
                     append(", replicaId=").append(replicaId).
                     append(", maxWait=").append(maxWait).
                     append(", minBytes=").append(minBytes).
@@ -163,11 +163,6 @@ public class FetchRequest extends AbstractRequest {
     }
 
     private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes,
-                         LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-        this(version, replicaId, maxWait, minBytes, maxBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
-    }
-
-    private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes,
                          LinkedHashMap<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel) {
         super(version);
         this.replicaId = replicaId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 68fa3d2..2ea8ecf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -67,7 +67,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
-            bld.append("(transactionalId=").append(transactionalId).
+            bld.append("(type=TxnOffsetCommitRequest").
+                    append(", transactionalId=").append(transactionalId).
                     append(", producerId=").append(producerId).
                     append(", producerEpoch=").append(producerEpoch).
                     append(", consumerGroupId=").append(consumerGroupId).

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 77b1da8..faa6ea5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -403,7 +403,7 @@ public class SenderTest {
         client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED);
         assertFalse(transactionManager.hasProducerId());
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof ClusterAuthorizationException);
 
         // cluster authorization is a fatal error for the producer

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index a1bd970..ed7ec84 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -59,7 +60,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -69,6 +69,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -79,7 +80,7 @@ import static org.junit.Assert.fail;
 public class TransactionManagerTest {
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
     private static final short ACKS_ALL = -1;
-    private static final int MAX_RETRIES = 0;
+    private static final int MAX_RETRIES = Integer.MAX_VALUE;
     private static final String CLIENT_ID = "clientId";
     private static final int MAX_BLOCK_TIMEOUT = 1000;
     private static final int REQUEST_TIMEOUT = 1000;
@@ -109,19 +110,8 @@ public class TransactionManagerTest {
         this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs);
         Metrics metrics = new Metrics(metricConfig, time);
         this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
-        this.sender = new Sender(this.client,
-                this.metadata,
-                this.accumulator,
-                true,
-                MAX_REQUEST_SIZE,
-                ACKS_ALL,
-                MAX_RETRIES,
-                metrics,
-                this.time,
-                REQUEST_TIMEOUT,
-                50,
-                transactionManager,
-                apiVersions);
+        this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
+                MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
         client.setNode(brokerNode);
     }
@@ -163,7 +153,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                                                                   "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -271,7 +261,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // find coordinator
         sender.run(time.milliseconds());
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
 
         sender.run(time.milliseconds()); // one more run to fail the InitProducerId future
@@ -294,7 +284,7 @@ public class TransactionManagerTest {
         prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, pid, RecordBatch.NO_PRODUCER_EPOCH);
         sender.run(time.milliseconds());
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(initPidResult.isCompleted());
         assertFalse(initPidResult.isSuccessful());
         assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException);
@@ -321,7 +311,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId);
         sender.run(time.milliseconds());  // FindCoordinator Failed
         sender.run(time.milliseconds());  // TxnOffsetCommit Aborted
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
         assertTrue(sendOffsetsResult.isCompleted());
         assertFalse(sendOffsetsResult.isSuccessful());
@@ -356,7 +346,7 @@ public class TransactionManagerTest {
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED));
         sender.run(time.milliseconds());  // TxnOffsetCommit Handled
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
         assertTrue(sendOffsetsResult.isCompleted());
         assertFalse(sendOffsetsResult.isSuccessful());
@@ -384,7 +374,7 @@ public class TransactionManagerTest {
         prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
         sender.run(time.milliseconds());  // AddOffsetsToTxn Handled
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
         assertTrue(sendOffsetsResult.isCompleted());
         assertFalse(sendOffsetsResult.isSuccessful());
@@ -416,7 +406,7 @@ public class TransactionManagerTest {
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
         sender.run(time.milliseconds());  // TxnOffsetCommit Handled
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
         assertTrue(sendOffsetsResult.isCompleted());
         assertFalse(sendOffsetsResult.isSuccessful());
@@ -439,7 +429,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxn(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
         sender.run(time.milliseconds());
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException);
 
         TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError();
@@ -462,7 +452,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
         sender.run(time.milliseconds());
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
 
         assertFatalError(TransactionalIdAuthorizationException.class);
@@ -584,7 +574,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // send produce.
 
         assertTrue(responseFuture.isDone());
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         responseFuture.get();
     }
 
@@ -608,16 +598,22 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // Send AddPartitionsRequest
         assertFalse(commitResult.isCompleted());
-
         sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.
+
         sender.run(time.milliseconds());  // try to commit.
         assertTrue(commitResult.isCompleted());  // commit should be cancelled with exception without being sent.
 
         try {
             commitResult.await();
             fail();  // the get() must throw an exception.
-        } catch (RuntimeException e) {
-            assertTrue(e instanceof KafkaException);
+        } catch (KafkaException e) {
+        }
+
+        try {
+            responseFuture.get();
+            fail("Expected produce future to raise an exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
         }
 
         // Commit is not allowed, so let's abort and try again.
@@ -627,7 +623,7 @@ public class TransactionManagerTest {
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
-        assertTrue(transactionManager.isReadyForTransaction());  // make sure we are ready for a transaction now.
+        assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
     }
 
     @Test
@@ -643,20 +639,125 @@ public class TransactionManagerTest {
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
 
         sender.run(time.milliseconds());  // Send AddPartitionsRequest
-        assertFalse(abortResult.isCompleted());
-
         sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        sender.run(time.milliseconds());  // try to abort
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+        assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
+    }
+
+    @Test
+    public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        // note since no partitions were added to the transaction, no EndTxn will be sent
+
         sender.run(time.milliseconds());  // try to abort
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
-        assertTrue(transactionManager.isReadyForTransaction());  // make sure we are ready for a transaction now.
+        assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
+
+        try {
+            responseFuture.get();
+            fail("Expected produce future to raise an exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof KafkaException);
+        }
+    }
+
+    @Test
+    public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
+        final long producerId = 13131L;
+        final short producerEpoch = 1;
+
+        doInitTransactions(producerId, producerEpoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        sender.run(time.milliseconds());  // Send AddPartitions and let it fail
+        assertFalse(responseFuture.isDone());
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+        // we should resend the AddPartitions
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
+
+        sender.run(time.milliseconds());  // Resend AddPartitions
+        sender.run(time.milliseconds());  // Send EndTxn
+
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+        assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
+
+        try {
+            responseFuture.get();
+            fail("Expected produce future to raise an exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof KafkaException);
+        }
+    }
+
+    @Test
+    public void testAbortResendsProduceRequestIfRetried() throws Exception {
+        final long producerId = 13131L;
+        final short producerEpoch = 1;
+
+        doInitTransactions(producerId, producerEpoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
+        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        sender.run(time.milliseconds());  // Send AddPartitions
+        sender.run(time.milliseconds());  // Send ProduceRequest and let it fail
+
+        assertFalse(responseFuture.isDone());
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+        // we should resend the ProduceRequest before aborting
+        prepareProduceResponse(Errors.NONE, producerId, producerEpoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
+
+        sender.run(time.milliseconds());  // Resend ProduceRequest
+        sender.run(time.milliseconds());  // Send EndTxn
+
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+        assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
+
+        RecordMetadata recordMetadata = responseFuture.get();
+        assertEquals(tp0.topic(), recordMetadata.topic());
     }
 
     @Test
@@ -739,27 +840,18 @@ public class TransactionManagerTest {
 
     @Test
     public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-        sender.run(time.milliseconds());  // get pid.
+        doInitTransactions(pid, epoch);
 
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
-
         prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid);
         sender.run(time.milliseconds());  // Send AddPartitionsRequest
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
         assertFalse(abortResult.isCompleted());
 
         sender.run(time.milliseconds());
@@ -769,19 +861,10 @@ public class TransactionManagerTest {
 
     @Test
     public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception {
-        client.setNode(brokerNode);
-        // This is called from the initTransactions method in the producer as the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
-        transactionManager.initializeTransactions();
-        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
 
-        sender.run(time.milliseconds());  // find coordinator
-        sender.run(time.milliseconds());
-
-        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
-        sender.run(time.milliseconds());  // get pid.
+        doInitTransactions(pid, epoch);
 
         transactionManager.beginTransaction();
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
@@ -815,7 +898,7 @@ public class TransactionManagerTest {
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxn(tp0, error);
         sender.run(time.milliseconds());  // attempt send addPartitions.
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
 
@@ -883,7 +966,7 @@ public class TransactionManagerTest {
                 AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body;
                 assertEquals(pid, addPartitionsToTxnRequest.producerId());
                 assertEquals(epoch, addPartitionsToTxnRequest.producerEpoch());
-                assertEquals(Arrays.asList(topicPartition), addPartitionsToTxnRequest.partitions());
+                assertEquals(singletonList(topicPartition), addPartitionsToTxnRequest.partitions());
                 assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
                 return true;
             }
@@ -958,23 +1041,23 @@ public class TransactionManagerTest {
             fail("Should have raised " + cause.getSimpleName());
         } catch (KafkaException e) {
             assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
-            assertTrue(transactionManager.isInErrorState());
+            assertTrue(transactionManager.hasError());
         }
 
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
         transactionManager.beginAbortingTransaction();
-        assertFalse(transactionManager.isInErrorState());
+        assertFalse(transactionManager.hasError());
     }
 
     private void assertFatalError(Class<? extends RuntimeException> cause) {
-        assertTrue(transactionManager.isInErrorState());
+        assertTrue(transactionManager.hasError());
 
         try {
             transactionManager.beginAbortingTransaction();
             fail("Should have raised " + cause.getSimpleName());
         } catch (KafkaException e) {
             assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
-            assertTrue(transactionManager.isInErrorState());
+            assertTrue(transactionManager.hasError());
         }
 
         // Transaction abort cannot clear fatal error state
@@ -983,7 +1066,7 @@ public class TransactionManagerTest {
             fail("Should have raised " + cause.getSimpleName());
         } catch (KafkaException e) {
             assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
-            assertTrue(transactionManager.isInErrorState());
+            assertTrue(transactionManager.hasError());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index fd9d884..205dc6e 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -70,6 +70,7 @@ class TransactionsTest extends KafkaServerTestHarness {
       producer.beginTransaction()
       producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
       producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
+      producer.flush()
       producer.abortTransaction()
 
       producer.beginTransaction()