You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/16 00:18:11 UTC

kafka git commit: KAFKA-5449; Fix race condition on producer dequeuing of EndTxn request

Repository: kafka
Updated Branches:
  refs/heads/trunk 6cc29e321 -> 54a3718a9


KAFKA-5449; Fix race condition on producer dequeuing of EndTxn request

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3348 from hachikuji/fix-has-unflushed-synchronization


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54a3718a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54a3718a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54a3718a

Branch: refs/heads/trunk
Commit: 54a3718a900a5286baf2193713ea8d58ca2c08f6
Parents: 6cc29e3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 16 01:17:54 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 16 01:17:54 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   4 +-
 .../producer/internals/IncompleteBatches.java   |  59 ++++++
 .../producer/internals/ProducerBatch.java       |   2 +-
 .../producer/internals/RecordAccumulator.java   |  64 ++----
 .../clients/producer/internals/Sender.java      |  22 +-
 .../producer/internals/TransactionManager.java  |  26 ++-
 .../internals/RecordAccumulatorTest.java        |  80 +++++++-
 .../internals/TransactionManagerTest.java       | 199 +++++++++++++++----
 8 files changed, 341 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1be30f1..5be20e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -496,7 +496,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void commitTransaction() throws ProducerFencedException {
         if (transactionManager == null)
             throw new IllegalStateException("Cannot commit transaction since transactions are not enabled");
-        TransactionalRequestResult result = transactionManager.beginCommittingTransaction();
+        TransactionalRequestResult result = transactionManager.beginCommit();
         sender.wakeup();
         result.await();
     }
@@ -510,7 +510,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void abortTransaction() throws ProducerFencedException {
         if (transactionManager == null)
             throw new IllegalStateException("Cannot abort transaction since transactions are not enabled.");
-        TransactionalRequestResult result = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult result = transactionManager.beginAbort();
         sender.wakeup();
         result.await();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
new file mode 100644
index 0000000..93b9596
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/*
+ * A thread-safe helper class to hold batches that haven't been acknowledged yet (including those
+ * which have and have not been sent).
+ */
+class IncompleteBatches {
+    private final Set<ProducerBatch> incomplete;
+
+    public IncompleteBatches() {
+        this.incomplete = new HashSet<>();
+    }
+
+    public void add(ProducerBatch batch) {
+        synchronized (incomplete) {
+            this.incomplete.add(batch);
+        }
+    }
+
+    public void remove(ProducerBatch batch) {
+        synchronized (incomplete) {
+            boolean removed = this.incomplete.remove(batch);
+            if (!removed)
+                throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
+        }
+    }
+
+    public Iterable<ProducerBatch> copyAll() {
+        synchronized (incomplete) {
+            return new ArrayList<>(this.incomplete);
+        }
+    }
+
+    public boolean isEmpty() {
+        synchronized (incomplete) {
+            return incomplete.isEmpty();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 9d8b82d..5679c26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -56,7 +56,7 @@ public final class ProducerBatch {
 
     private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);
 
-    private enum FinalState { ABORTED, FAILED, SUCCEEDED };
+    private enum FinalState { ABORTED, FAILED, SUCCEEDED }
 
     final long createdMs;
     final TopicPartition topicPartition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 505417c..10c68d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -100,8 +100,8 @@ public final class RecordAccumulator {
      * @param metrics The metrics
      * @param time The time instance to use
      * @param apiVersions Request API versions for current connected brokers
-     * @param transactionManager The shared transaction state object which tracks Pids, epochs, and sequence numbers per
-     *                         partition.
+     * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
+     *                           numbers per partition.
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
@@ -399,9 +399,9 @@ public final class RecordAccumulator {
     }
 
     /**
-     * @return Whether there is any unsent record in the accumulator.
+     * Check whether there are any batches which haven't been drained
      */
-    public boolean hasUnsent() {
+    public boolean hasUndrained() {
         for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
             Deque<ProducerBatch> deque = entry.getValue();
             synchronized (deque) {
@@ -569,19 +569,18 @@ public final class RecordAccumulator {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (ProducerBatch batch : this.incomplete.all())
+            for (ProducerBatch batch : this.incomplete.copyAll())
                 batch.produceFuture.await();
         } finally {
             this.flushesInProgress.decrementAndGet();
         }
     }
 
-    public boolean hasUnflushedBatches() {
-        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches().entrySet()) {
-            if (!entry.getValue().isEmpty())
-                return true;
-        }
-        return !this.incomplete.incomplete.isEmpty();
+    /**
+     * Check whether there are any pending batches (whether sent or unsent).
+     */
+    public boolean hasIncomplete() {
+        return !this.incomplete.isEmpty();
     }
 
     /**
@@ -610,8 +609,11 @@ public final class RecordAccumulator {
         abortBatches(new IllegalStateException("Producer is closed forcefully."));
     }
 
+    /**
+     * Abort all incomplete batches (whether they have been sent or not)
+     */
     void abortBatches(final RuntimeException reason) {
-        for (ProducerBatch batch : incomplete.all()) {
+        for (ProducerBatch batch : incomplete.copyAll()) {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             synchronized (dq) {
                 batch.abortRecordAppends();
@@ -622,8 +624,11 @@ public final class RecordAccumulator {
         }
     }
 
-    void abortOpenBatches(RuntimeException reason) {
-        for (ProducerBatch batch : incomplete.all()) {
+    /**
+     * Abort any batches which have not been drained
+     */
+    void abortUndrainedBatches(RuntimeException reason) {
+        for (ProducerBatch batch : incomplete.copyAll()) {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             boolean aborted = false;
             synchronized (dq) {
@@ -685,35 +690,4 @@ public final class RecordAccumulator {
         }
     }
 
-    /*
-     * A threadsafe helper class to hold batches that haven't been ack'd yet
-     */
-    private final static class IncompleteBatches {
-        private final Set<ProducerBatch> incomplete;
-
-        public IncompleteBatches() {
-            this.incomplete = new HashSet<>();
-        }
-
-        public void add(ProducerBatch batch) {
-            synchronized (incomplete) {
-                this.incomplete.add(batch);
-            }
-        }
-
-        public void remove(ProducerBatch batch) {
-            synchronized (incomplete) {
-                boolean removed = this.incomplete.remove(batch);
-                if (!removed)
-                    throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
-            }
-        }
-
-        public Iterable<ProducerBatch> all() {
-            synchronized (incomplete) {
-                return new ArrayList<>(this.incomplete);
-            }
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e2bff27..27eb244 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -170,7 +170,7 @@ public class Sender implements Runnable {
         // okay we stopped accepting requests but there may still be
         // requests in the accumulator or waiting for acknowledgment,
         // wait until these are completed.
-        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
+        while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
             try {
                 run(time.milliseconds());
             } catch (Exception e) {
@@ -201,7 +201,7 @@ public class Sender implements Runnable {
             if (!transactionManager.isTransactional()) {
                 // this is an idempotent producer, so make sure we have a producer id
                 maybeWaitForProducerId();
-            } else if (transactionManager.hasInflightRequest() || maybeSendTransactionalRequest(now)) {
+            } else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {
                 // as long as there are outstanding transactional requests, we simply wait for them to return
                 client.poll(retryBackoffMs, now);
                 return;
@@ -216,7 +216,7 @@ public class Sender implements Runnable {
                 client.poll(retryBackoffMs, now);
                 return;
             } else if (transactionManager.hasAbortableError()) {
-                accumulator.abortOpenBatches(transactionManager.lastError());
+                accumulator.abortUndrainedBatches(transactionManager.lastError());
             }
         }
 
@@ -302,13 +302,9 @@ public class Sender implements Runnable {
     }
 
     private boolean maybeSendTransactionalRequest(long now) {
-        if (transactionManager.isCompletingTransaction() &&
-                !transactionManager.hasPartitionsToAdd() &&
-                accumulator.hasUnflushedBatches()) {
-
-            // If the transaction is being aborted, then we can clear any unsent produce requests
+        if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
             if (transactionManager.isAborting())
-                accumulator.abortOpenBatches(new KafkaException("Failing batch since transaction was aborted"));
+                accumulator.abortUndrainedBatches(new KafkaException("Failing batch since transaction was aborted"));
 
             // There may still be requests left which are being retried. Since we do not know whether they had
             // been successfully appended to the broker log, we must resend them until their final status is clear.
@@ -316,13 +312,9 @@ public class Sender implements Runnable {
             // be correct which would lead to an OutOfSequenceException.
             if (!accumulator.flushInProgress())
                 accumulator.beginFlush();
-
-            // Do not send the EndTxn until all pending batches have been completed
-            if (accumulator.hasUnflushedBatches())
-                return false;
         }
 
-        TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
+        TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(accumulator.hasIncomplete());
         if (nextRequestHandler == null)
             return false;
 
@@ -377,7 +369,7 @@ public class Sender implements Runnable {
     }
 
     private void maybeAbortBatches(RuntimeException exception) {
-        if (accumulator.hasUnflushedBatches()) {
+        if (accumulator.hasIncomplete()) {
             String logPrefix = "";
             if (transactionManager != null)
                 logPrefix = transactionManager.logPrefix;

http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 5c498db..f2deca3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -173,14 +173,14 @@ public class TransactionManager {
         transitionTo(State.IN_TRANSACTION);
     }
 
-    public synchronized TransactionalRequestResult beginCommittingTransaction() {
+    public synchronized TransactionalRequestResult beginCommit() {
         ensureTransactional();
         maybeFailWithError();
         transitionTo(State.COMMITTING_TRANSACTION);
         return beginCompletingTransaction(TransactionResult.COMMIT);
     }
 
-    public synchronized TransactionalRequestResult beginAbortingTransaction() {
+    public synchronized TransactionalRequestResult beginAbort() {
         ensureTransactional();
         if (currentState != State.ABORTABLE_ERROR)
             maybeFailWithError();
@@ -268,7 +268,7 @@ public class TransactionManager {
         return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty();
     }
 
-    synchronized boolean isCompletingTransaction() {
+    synchronized boolean isCompleting() {
         return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION;
     }
 
@@ -377,18 +377,26 @@ public class TransactionManager {
         sequenceNumbers.put(topicPartition, currentSequenceNumber);
     }
 
-    synchronized TxnRequestHandler nextRequestHandler() {
+    synchronized TxnRequestHandler nextRequestHandler(boolean hasIncompleteBatches) {
         if (!newPartitionsInTransaction.isEmpty())
             enqueueRequest(addPartitionsToTransactionHandler());
 
-        TxnRequestHandler nextRequestHandler = pendingRequests.poll();
-        if (nextRequestHandler != null && maybeTerminateRequestWithError(nextRequestHandler)) {
+        TxnRequestHandler nextRequestHandler = pendingRequests.peek();
+        if (nextRequestHandler == null)
+            return null;
+
+        // Do not send the EndTxn until all batches have been flushed
+        if (nextRequestHandler.isEndTxn() && hasIncompleteBatches)
+            return null;
+
+        pendingRequests.poll();
+        if (maybeTerminateRequestWithError(nextRequestHandler)) {
             log.trace("{}Not sending transactional request {} because we are in an error state",
                     logPrefix, nextRequestHandler.requestBuilder());
             return null;
         }
 
-        if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && !transactionStarted) {
+        if (nextRequestHandler.isEndTxn() && !transactionStarted) {
             nextRequestHandler.result.done();
             if (currentState != State.FATAL_ERROR) {
                 log.debug("{}Not sending EndTxn for completed transaction since no partitions " +
@@ -432,7 +440,7 @@ public class TransactionManager {
         inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
-    boolean hasInflightRequest() {
+    boolean hasInFlightRequest() {
         return inFlightRequestCorrelationId != NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
@@ -459,7 +467,7 @@ public class TransactionManager {
     // visible for testing
     synchronized boolean hasOngoingTransaction() {
         // transactions are considered ongoing once started until completion or a fatal error
-        return currentState == State.IN_TRANSACTION || isCompletingTransaction() || hasAbortableError();
+        return currentState == State.IN_TRANSACTION || isCompleting() || hasAbortableError();
     }
 
     // visible for testing

http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 3929156..81487b9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -322,8 +323,10 @@ public class RecordAccumulatorTest {
         long lingerMs = Long.MAX_VALUE;
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
                 CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
-        for (int i = 0; i < 100; i++)
+        for (int i = 0; i < 100; i++) {
             accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
+            assertTrue(accum.hasIncomplete());
+        }
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
 
@@ -332,13 +335,16 @@ public class RecordAccumulatorTest {
 
         // drain and deallocate all batches
         Map<Integer, List<ProducerBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertTrue(accum.hasIncomplete());
+
         for (List<ProducerBatch> batches: results.values())
             for (ProducerBatch batch: batches)
                 accum.deallocate(batch);
 
         // should be complete with no unsent records.
         accum.awaitFlushCompletion();
-        assertFalse(accum.hasUnsent());
+        assertFalse(accum.hasUndrained());
+        assertFalse(accum.hasIncomplete());
     }
 
 
@@ -369,12 +375,13 @@ public class RecordAccumulatorTest {
         }
     }
 
-
     @Test
     public void testAbortIncompleteBatches() throws Exception {
         long lingerMs = Long.MAX_VALUE;
+        int numRecords = 100;
+
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
+        final RecordAccumulator accum = new RecordAccumulator(128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
                 CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         class TestCallback implements Callback {
             @Override
@@ -383,14 +390,71 @@ public class RecordAccumulatorTest {
                 numExceptionReceivedInCallback.incrementAndGet();
             }
         }
-        for (int i = 0; i < 100; i++)
+        for (int i = 0; i < numRecords; i++)
             accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
-        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+        assertFalse(result.readyNodes.isEmpty());
+        Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertTrue(accum.hasUndrained());
+        assertTrue(accum.hasIncomplete());
+
+        int numDrainedRecords = 0;
+        for (Map.Entry<Integer, List<ProducerBatch>> drainedEntry : drained.entrySet()) {
+            for (ProducerBatch batch : drainedEntry.getValue()) {
+                assertTrue(batch.isClosed());
+                assertFalse(batch.produceFuture.completed());
+                numDrainedRecords += batch.recordCount;
+            }
+        }
 
+        assertTrue(numDrainedRecords > 0 && numDrainedRecords < numRecords);
         accum.abortIncompleteBatches();
-        assertEquals(numExceptionReceivedInCallback.get(), 100);
-        assertFalse(accum.hasUnsent());
+        assertEquals(numRecords, numExceptionReceivedInCallback.get());
+        assertFalse(accum.hasUndrained());
+        assertFalse(accum.hasIncomplete());
+    }
+
+    @Test
+    public void testAbortUnsentBatches() throws Exception {
+        long lingerMs = Long.MAX_VALUE;
+        int numRecords = 100;
+
+        final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
+        final RecordAccumulator accum = new RecordAccumulator(128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
+        final KafkaException cause = new KafkaException();
+
+        class TestCallback implements Callback {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                assertEquals(cause, exception);
+                numExceptionReceivedInCallback.incrementAndGet();
+            }
+        }
+        for (int i = 0; i < numRecords; i++)
+            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs);
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        assertFalse(result.readyNodes.isEmpty());
+        Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
+                time.milliseconds());
+        assertTrue(accum.hasUndrained());
+        assertTrue(accum.hasIncomplete());
+
+        accum.abortUndrainedBatches(cause);
+        int numDrainedRecords = 0;
+        for (Map.Entry<Integer, List<ProducerBatch>> drainedEntry : drained.entrySet()) {
+            for (ProducerBatch batch : drainedEntry.getValue()) {
+                assertTrue(batch.isClosed());
+                assertFalse(batch.produceFuture.completed());
+                numDrainedRecords += batch.recordCount;
+            }
+        }
+
+        assertTrue(numDrainedRecords > 0);
+        assertTrue(numExceptionReceivedInCallback.get() > 0);
+        assertEquals(numRecords, numExceptionReceivedInCallback.get() + numDrainedRecords);
+        assertFalse(accum.hasUndrained());
+        assertTrue(accum.hasIncomplete());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index bdad483..14b2835 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -124,6 +124,23 @@ public class TransactionManagerTest {
         client.setNode(brokerNode);
     }
 
+    @Test
+    public void testEndTxnNotSentIfIncompleteBatches() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxn(tp0, Errors.NONE);
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+
+        transactionManager.beginCommit();
+        assertNull(transactionManager.nextRequestHandler(true));
+        assertTrue(transactionManager.nextRequestHandler(false).isEndTxn());
+    }
+
     @Test(expected = IllegalStateException.class)
     public void testFailIfNotReadyForSendNoProducerId() {
         transactionManager.failIfNotReadyForSend();
@@ -188,7 +205,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxn(partition, Errors.NONE);
         sender.run(time.milliseconds());
 
-        transactionManager.beginAbortingTransaction();
+        transactionManager.beginAbort();
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
@@ -215,7 +232,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxn(partition, Errors.NONE);
         sender.run(time.milliseconds());
 
-        transactionManager.beginCommittingTransaction();
+        transactionManager.beginCommit();
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
@@ -245,7 +262,7 @@ public class TransactionManagerTest {
         transactionManager.transitionToAbortableError(new KafkaException());
         assertTrue(transactionManager.hasOngoingTransaction());
 
-        transactionManager.beginAbortingTransaction();
+        transactionManager.beginAbort();
         assertTrue(transactionManager.hasOngoingTransaction());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
@@ -529,12 +546,12 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasPendingOffsetCommits());
         assertTrue(addOffsetsResult.isCompleted());  // We should only be done after both RPCs complete.
 
-        transactionManager.beginCommittingTransaction();
+        transactionManager.beginCommit();
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         sender.run(time.milliseconds());  // commit.
 
         assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.isCompletingTransaction());
+        assertFalse(transactionManager.isCompleting());
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
 
@@ -866,7 +883,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
 
         assertTrue(transactionManager.hasAbortableError());
-        transactionManager.beginAbortingTransaction();
+        transactionManager.beginAbort();
         sender.run(time.milliseconds());
         assertTrue(responseFuture.isDone());
         assertFutureFailed(responseFuture);
@@ -875,7 +892,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         assertTrue(transactionManager.isReady());
         assertFalse(transactionManager.hasPartitionsToAdd());
-        assertFalse(accumulator.hasUnflushedBatches());
+        assertFalse(accumulator.hasIncomplete());
 
         // ensure we can now start a new transaction
 
@@ -890,7 +907,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.hasPartitionsToAdd());
 
-        transactionManager.beginCommittingTransaction();
+        transactionManager.beginCommit();
         prepareProduceResponse(Errors.NONE, pid, epoch);
         sender.run(time.milliseconds());
 
@@ -932,14 +949,14 @@ public class TransactionManagerTest {
         assertFalse(unauthorizedTopicProduceFuture.isDone());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
-        transactionManager.beginAbortingTransaction();
+        transactionManager.beginAbort();
         sender.run(time.milliseconds());
         // neither produce request has been sent, so they should both be failed immediately
         assertFutureFailed(authorizedTopicProduceFuture);
         assertFutureFailed(unauthorizedTopicProduceFuture);
         assertTrue(transactionManager.isReady());
         assertFalse(transactionManager.hasPartitionsToAdd());
-        assertFalse(accumulator.hasUnflushedBatches());
+        assertFalse(accumulator.hasIncomplete());
 
         // ensure we can now start a new transaction
 
@@ -954,7 +971,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.hasPartitionsToAdd());
 
-        transactionManager.beginCommittingTransaction();
+        transactionManager.beginCommit();
         prepareProduceResponse(Errors.NONE, pid, epoch);
         sender.run(time.milliseconds());
 
@@ -988,7 +1005,7 @@ public class TransactionManagerTest {
         prepareProduceResponse(Errors.REQUEST_TIMED_OUT, pid, epoch);
         sender.run(time.milliseconds());
         assertFalse(authorizedTopicProduceFuture.isDone());
-        assertTrue(accumulator.hasUnflushedBatches());
+        assertTrue(accumulator.hasIncomplete());
 
         transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
         Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
@@ -1007,12 +1024,12 @@ public class TransactionManagerTest {
         assertNotNull(authorizedTopicProduceFuture.get());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
-        transactionManager.beginAbortingTransaction();
+        transactionManager.beginAbort();
         sender.run(time.milliseconds());
         // neither produce request has been sent, so they should both be failed immediately
         assertTrue(transactionManager.isReady());
         assertFalse(transactionManager.hasPartitionsToAdd());
-        assertFalse(accumulator.hasUnflushedBatches());
+        assertFalse(accumulator.hasIncomplete());
 
         // ensure we can now start a new transaction
 
@@ -1027,7 +1044,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isPartitionAdded(tp0));
         assertFalse(transactionManager.hasPartitionsToAdd());
 
-        transactionManager.beginCommittingTransaction();
+        transactionManager.beginCommit();
         prepareProduceResponse(Errors.NONE, pid, epoch);
         sender.run(time.milliseconds());
 
@@ -1075,7 +1092,7 @@ public class TransactionManagerTest {
 
         assertFalse(responseFuture.isDone());
 
-        TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
 
         // we have an append, an add partitions request, and now also an endtxn.
         // The order should be:
@@ -1097,7 +1114,7 @@ public class TransactionManagerTest {
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         assertFalse(commitResult.isCompleted());
         assertTrue(transactionManager.hasOngoingTransaction());
-        assertTrue(transactionManager.isCompletingTransaction());
+        assertTrue(transactionManager.isCompleting());
 
         sender.run(time.milliseconds());
         assertTrue(commitResult.isCompleted());
@@ -1193,7 +1210,7 @@ public class TransactionManagerTest {
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
-        TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
@@ -1219,7 +1236,7 @@ public class TransactionManagerTest {
         }
 
         // Commit is not allowed, so let's abort and try again.
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
         sender.run(time.milliseconds());  // Send abort request. It is valid to transition from ERROR to ABORT
 
@@ -1249,7 +1266,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // Send AddPartitionsRequest
         sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
         sender.run(time.milliseconds());  // try to abort
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
@@ -1275,7 +1292,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // Send AddPartitionsRequest
         sender.run(time.milliseconds());  // Send Produce Request
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
         assertTrue(transactionManager.isAborting());
         assertFalse(transactionManager.hasError());
 
@@ -1294,6 +1311,110 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(tp0, Errors.NONE);
+        sender.run(time.milliseconds());
+        assertTrue(accumulator.hasUndrained());
+
+        // committing the transaction should cause the unsent batch to be flushed
+        transactionManager.beginCommit();
+        sender.run(time.milliseconds());
+        assertFalse(accumulator.hasUndrained());
+        assertTrue(accumulator.hasIncomplete());
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(responseFuture.isDone());
+
+        // until the produce future returns, we will not send EndTxn
+        sender.run(time.milliseconds());
+        assertFalse(accumulator.hasUndrained());
+        assertTrue(accumulator.hasIncomplete());
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(responseFuture.isDone());
+
+        // now the produce response returns
+        sendProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());
+        assertTrue(responseFuture.isDone());
+        assertFalse(accumulator.hasUndrained());
+        assertFalse(accumulator.hasIncomplete());
+        assertFalse(transactionManager.hasInFlightRequest());
+
+        // now we send EndTxn
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.hasInFlightRequest());
+        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertTrue(transactionManager.isReady());
+    }
+
+    @Test
+    public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(tp0, Errors.NONE);
+        sender.run(time.milliseconds());
+        assertTrue(accumulator.hasUndrained());
+
+        accumulator.beginFlush();
+        sender.run(time.milliseconds());
+        assertFalse(accumulator.hasUndrained());
+        assertTrue(accumulator.hasIncomplete());
+        assertFalse(transactionManager.hasInFlightRequest());
+
+        // now we begin the commit with the produce request still pending
+        transactionManager.beginCommit();
+        sender.run(time.milliseconds());
+        assertFalse(accumulator.hasUndrained());
+        assertTrue(accumulator.hasIncomplete());
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(responseFuture.isDone());
+
+        // until the produce future returns, we will not send EndTxn
+        sender.run(time.milliseconds());
+        assertFalse(accumulator.hasUndrained());
+        assertTrue(accumulator.hasIncomplete());
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(responseFuture.isDone());
+
+        // now the produce response returns
+        sendProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());
+        assertTrue(responseFuture.isDone());
+        assertFalse(accumulator.hasUndrained());
+        assertFalse(accumulator.hasIncomplete());
+        assertFalse(transactionManager.hasInFlightRequest());
+
+        // now we send EndTxn
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.hasInFlightRequest());
+        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertTrue(transactionManager.isReady());
+    }
+
+    @Test
     public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
         final long pid = 13131L;
         final short epoch = 1;
@@ -1336,7 +1457,7 @@ public class TransactionManagerTest {
 
         assertFalse(responseFuture.isDone());
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
         // note since no partitions were added to the transaction, no EndTxn will be sent
 
         sender.run(time.milliseconds());  // try to abort
@@ -1369,7 +1490,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // Send AddPartitions and let it fail
         assertFalse(responseFuture.isDone());
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
         // we should resend the AddPartitions
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
@@ -1410,7 +1531,7 @@ public class TransactionManagerTest {
 
         assertFalse(responseFuture.isDone());
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
         // we should resend the ProduceRequest before aborting
         prepareProduceResponse(Errors.NONE, producerId, producerEpoch);
@@ -1518,7 +1639,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid);
         sender.run(time.milliseconds());  // Send AddPartitionsRequest
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
         assertFalse(abortResult.isCompleted());
 
         sender.run(time.milliseconds());
@@ -1540,7 +1661,7 @@ public class TransactionManagerTest {
 
         transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
         prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
         sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
@@ -1566,7 +1687,7 @@ public class TransactionManagerTest {
 
         transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
         prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch);
         sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
@@ -1839,7 +1960,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
         assertFalse(responseFuture.isDone());
 
-        TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
 
         // Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
         time.sleep(10000);
@@ -1866,10 +1987,10 @@ public class TransactionManagerTest {
 
         assertTrue(transactionManager.hasAbortableError());
         assertTrue(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.isCompletingTransaction());
+        assertFalse(transactionManager.isCompleting());
         assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
 
@@ -1995,7 +2116,15 @@ public class TransactionManagerTest {
     }
 
     private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
+        client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error));
+    }
+
+    private void sendEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
+        client.respond(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error));
+    }
+
+    private MockClient.RequestMatcher endTxnMatcher(final TransactionResult result, final long pid, final short epoch) {
+        return new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
                 EndTxnRequest endTxnRequest = (EndTxnRequest) body;
@@ -2005,7 +2134,7 @@ public class TransactionManagerTest {
                 assertEquals(result, endTxnRequest.command());
                 return true;
             }
-        }, new EndTxnResponse(0, error));
+        };
     }
 
     private void prepareAddOffsetsToTxnResponse(Errors error, final String consumerGroupId, final long producerId,
@@ -2066,7 +2195,7 @@ public class TransactionManagerTest {
         }
 
         assertTrue(transactionManager.hasError());
-        transactionManager.beginAbortingTransaction();
+        transactionManager.beginAbort();
         assertFalse(transactionManager.hasError());
     }
 
@@ -2074,7 +2203,7 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasError());
 
         try {
-            transactionManager.beginAbortingTransaction();
+            transactionManager.beginAbort();
             fail("Should have raised " + cause.getSimpleName());
         } catch (KafkaException e) {
             assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
@@ -2083,7 +2212,7 @@ public class TransactionManagerTest {
 
         // Transaction abort cannot clear fatal error state
         try {
-            transactionManager.beginAbortingTransaction();
+            transactionManager.beginAbort();
             fail("Should have raised " + cause.getSimpleName());
         } catch (KafkaException e) {
             assertTrue(cause.isAssignableFrom(e.getCause().getClass()));