You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/01 04:33:40 UTC

kafka git commit: KAFKA-5340; Batch splitting should preserve magic and transactional flag

Repository: kafka
Updated Branches:
  refs/heads/trunk 6360e04e7 -> e4a6b50de


KAFKA-5340; Batch splitting should preserve magic and transactional flag

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Jiangjie Qin <be...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #3162 from hachikuji/KAFKA-5340


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4a6b50d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4a6b50d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4a6b50d

Branch: refs/heads/trunk
Commit: e4a6b50deca8fabc9880c6764334bfaa830a6d5e
Parents: 6360e04
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 31 21:31:52 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 31 21:31:52 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/ProducerBatch.java       | 30 +++++--
 .../producer/internals/RecordAccumulator.java   | 13 +--
 .../clients/producer/internals/Sender.java      |  4 +-
 .../producer/internals/TransactionManager.java  |  9 +++
 .../kafka/common/record/MemoryRecords.java      | 13 +--
 .../common/record/MemoryRecordsBuilder.java     |  7 +-
 .../producer/internals/ProducerBatchTest.java   | 43 +++++++++-
 .../internals/RecordAccumulatorTest.java        |  2 +-
 .../clients/producer/internals/SenderTest.java  | 85 +++++++++++++++-----
 .../common/record/MemoryRecordsBuilderTest.java |  9 +--
 10 files changed, 162 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 974e230..c7253a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.MutableRecordBatch;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
 import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
 
 
@@ -179,23 +181,29 @@ public final class ProducerBatch {
     public Deque<ProducerBatch> split(int splitBatchSize) {
         Deque<ProducerBatch> batches = new ArrayDeque<>();
         MemoryRecords memoryRecords = recordsBuilder.build();
+
         Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();
         if (!recordBatchIter.hasNext())
             throw new IllegalStateException("Cannot split an empty producer batch.");
+
         RecordBatch recordBatch = recordBatchIter.next();
+        if (recordBatch.magic() < MAGIC_VALUE_V2 && !recordBatch.isCompressed())
+            throw new IllegalArgumentException("Batch splitting cannot be used with non-compressed messages " +
+                    "with version v0 and v1");
+
         if (recordBatchIter.hasNext())
-            throw new IllegalStateException("A producer batch should only have one record batch.");
+            throw new IllegalArgumentException("A producer batch should only have one record batch.");
 
         Iterator<Thunk> thunkIter = thunks.iterator();
         // We always allocate batch size because we are already splitting a big batch.
         // And we also Retain the create time of the original batch.
         ProducerBatch batch = null;
+
         for (Record record : recordBatch) {
             assert thunkIter.hasNext();
             Thunk thunk = thunkIter.next();
-            if (batch == null) {
+            if (batch == null)
                 batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
-            }
 
             // A newly created batch can always host the first message.
             if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
@@ -204,6 +212,7 @@ public final class ProducerBatch {
                 batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
             }
         }
+
         // Close the last batch and add it to the batch list after split.
         if (batch != null)
             batches.add(batch);
@@ -217,11 +226,19 @@ public final class ProducerBatch {
         int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(),
                 record.key(), record.value(), record.headers()), batchSize);
         ByteBuffer buffer = ByteBuffer.allocate(initialSize);
+
+        // Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional)
+        // for the newly created batch. This will be set when the batch is dequeued for sending (which is consistent
+        // with how normal batches are handled).
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
-                TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional());
+                TimestampType.CREATE_TIME, 0L);
         return new ProducerBatch(topicPartition, builder, this.createdMs, true);
     }
 
+    public boolean isCompressed() {
+        return recordsBuilder.compressionType() != CompressionType.NONE;
+    }
+
     /**
      * A callback and the associated FutureRecordMetadata argument to pass to it.
      */
@@ -329,8 +346,9 @@ public final class ProducerBatch {
         return recordsBuilder.isFull();
     }
 
-    public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) {
-        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence);
+    public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
+        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
+                baseSequence, isTransactional);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 330c244..3f9f4b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -238,10 +238,7 @@ public final class RecordAccumulator {
             throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
                     "support the required message format (v2). The broker must be version 0.11 or later.");
         }
-        boolean isTransactional = false;
-        if (transactionManager != null)
-            isTransactional = transactionManager.isInTransaction();
-        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L, isTransactional);
+        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
     }
 
     /**
@@ -470,11 +467,17 @@ public final class RecordAccumulator {
                                         break;
                                     } else {
                                         ProducerIdAndEpoch producerIdAndEpoch = null;
+                                        boolean isTransactional = false;
                                         if (transactionManager != null) {
+                                            if (!transactionManager.ensurePartitionAdded(tp))
+                                                break;
+
                                             producerIdAndEpoch = transactionManager.producerIdAndEpoch();
                                             if (!producerIdAndEpoch.isValid())
                                                 // we cannot send the batch until we have refreshed the producer id
                                                 break;
+
+                                            isTransactional = transactionManager.isInTransaction();
                                         }
 
                                         ProducerBatch batch = deque.pollFirst();
@@ -488,7 +491,7 @@ public final class RecordAccumulator {
                                             log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
                                                     node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
                                                     batch.topicPartition, sequenceNumber);
-                                            batch.setProducerState(producerIdAndEpoch, sequenceNumber);
+                                            batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional);
                                         }
                                         batch.close();
                                         size += batch.sizeInBytes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index f498f7d..01ff91a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
@@ -499,7 +500,8 @@ public class Sender implements Runnable {
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                long now) {
         Errors error = response.error;
-        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1) {
+        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
+                (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
             // If the batch is too large, we split the batch and send the split batches again. We do not decrement
             // the retry attempts in this case.
             log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}",

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 30fff86..221816c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -232,6 +232,15 @@ public class TransactionManager {
         return lastError;
     }
 
+    public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
+        if (isInTransaction() && !partitionsInTransaction.contains(tp)) {
+            transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " +
+                    "for partition " + tp + ", which hasn't been added to the transaction yet"));
+            return false;
+        }
+        return true;
+    }
+
     public String transactionalId() {
         return transactionalId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 1d45635..46798cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -359,14 +359,6 @@ public class MemoryRecords extends AbstractRecords {
                                                byte magic,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
-                                               long baseOffset) {
-        return builder(buffer, magic, compressionType, timestampType, baseOffset, false);
-    }
-
-    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
-                                               byte magic,
-                                               CompressionType compressionType,
-                                               TimestampType timestampType,
                                                long baseOffset,
                                                long logAppendTime) {
         return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
@@ -378,13 +370,12 @@ public class MemoryRecords extends AbstractRecords {
                                                byte magic,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
-                                               long baseOffset,
-                                               boolean isTransactional) {
+                                               long baseOffset) {
         long logAppendTime = RecordBatch.NO_TIMESTAMP;
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             logAppendTime = System.currentTimeMillis();
         return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
-                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional,
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
                 RecordBatch.NO_PARTITION_LEADER_EPOCH);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 66560ca..89d314d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -52,7 +52,6 @@ public class MemoryRecordsBuilder {
     private final int initialPosition;
     private final long baseOffset;
     private final long logAppendTime;
-    private final boolean isTransactional;
     private final boolean isControlBatch;
     private final int partitionLeaderEpoch;
     private final int writeLimit;
@@ -60,6 +59,7 @@ public class MemoryRecordsBuilder {
     private volatile float estimatedCompressionRatio;
 
     private boolean appendStreamIsClosed = false;
+    private boolean isTransactional;
     private long producerId;
     private short producerEpoch;
     private int baseSequence;
@@ -196,7 +196,7 @@ public class MemoryRecordsBuilder {
      */
     public MemoryRecords build() {
         if (aborted) {
-            throw new KafkaException("Attempting to build an aborted record batch");
+            throw new IllegalStateException("Attempting to build an aborted record batch");
         }
         close();
         return builtRecords;
@@ -235,7 +235,7 @@ public class MemoryRecordsBuilder {
         }
     }
 
-    public void setProducerState(long producerId, short producerEpoch, int baseSequence) {
+    public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
         if (isClosed()) {
             // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
             // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
@@ -246,6 +246,7 @@ public class MemoryRecordsBuilder {
         this.producerId = producerId;
         this.producerEpoch = producerEpoch;
         this.baseSequence = baseSequence;
+        this.isTransactional = isTransactional;
     }
 
     public void overrideLastOffset(long lastOffset) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index da93015..6d2d2f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -28,7 +28,11 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Deque;
 
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -52,9 +56,9 @@ public class ProducerBatchTest {
 
     @Test
     public void testAppendedChecksumMagicV0AndV1() {
-        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) {
+        for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) {
             MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
-                    CompressionType.NONE, TimestampType.CREATE_TIME, 128);
+                    CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
             ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
             byte[] key = "hi".getBytes();
             byte[] value = "there".getBytes();
@@ -67,6 +71,41 @@ public class ProducerBatchTest {
         }
     }
 
+    @Test
+    public void testSplitPreservesMagicAndCompressionType() {
+        for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2)) {
+            for (CompressionType compressionType : CompressionType.values()) {
+                if (compressionType == CompressionType.NONE && magic < MAGIC_VALUE_V2)
+                    continue;
+
+                MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic,
+                        compressionType, TimestampType.CREATE_TIME, 0L);
+
+                ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
+                while (true) {
+                    FutureRecordMetadata future = batch.tryAppend(now, "hi".getBytes(), "there".getBytes(),
+                            Record.EMPTY_HEADERS, null, now);
+                    if (future == null)
+                        break;
+                }
+
+                Deque<ProducerBatch> batches = batch.split(512);
+                assertTrue(batches.size() >= 2);
+
+                for (ProducerBatch splitProducerBatch : batches) {
+                    assertEquals(magic, splitProducerBatch.magic());
+                    assertTrue(splitProducerBatch.isSplitBatch());
+
+                    for (RecordBatch splitBatch : splitProducerBatch.records().batches()) {
+                        assertEquals(magic, splitBatch.magic());
+                        assertEquals(0L, splitBatch.baseOffset());
+                        assertEquals(compressionType, splitBatch.compressionType());
+                    }
+                }
+            }
+        }
+    }
+
     /**
      * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create
      * time is interpreted correctly as not expired when the linger time is larger than the difference

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index e079f2a..2875ba2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -584,7 +584,7 @@ public class RecordAccumulatorTest {
         assertNotNull(future1);
         assertNotNull(future2);
         batch.close();
-        // Enqueue the batch to the accumulator so that as if the batch was created by the accumulator.
+        // Enqueue the batch to the accumulator as if the batch was created by the accumulator.
         accum.reenqueue(batch, now);
         time.sleep(101L);
         // Drain the batch.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c1c5a2e..927a937 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -42,7 +42,9 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
@@ -96,6 +98,7 @@ public class SenderTest {
 
     @Before
     public void setup() {
+        client.setNode(cluster.nodes().get(0));
         setupWithTransactionState(null);
     }
 
@@ -548,17 +551,41 @@ public class SenderTest {
     }
 
     @Test
-    public void testSplitBatchAndSend() throws Exception {
+    public void testIdempotentSplitBatchAndSend() throws Exception {
+        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
+        TransactionManager txnManager = new TransactionManager();
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
+        testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
+    }
+
+    @Test
+    public void testTransactionalSplitBatchAndSend() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
+        TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        sender.run(time.milliseconds());
+
+        testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
+    }
+
+    private void testSplitBatchAndSend(TransactionManager txnManager,
+                                       ProducerIdAndEpoch producerIdAndEpoch,
+                                       TopicPartition tp) throws Exception {
         int maxRetries = 1;
-        String topic = "testSplitBatchAndSend";
+        String topic = tp.topic();
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
         Metrics m = new Metrics();
-        TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0);
-        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
-        txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
         accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
-                                            new ApiVersions(), txnManager);
+                new ApiVersions(), txnManager);
         try {
             Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
@@ -566,14 +593,13 @@ public class SenderTest {
             Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
             // Send the first message.
-            final TopicPartition tp2 = new TopicPartition(topic, 1);
             Future<RecordMetadata> f1 =
-                    accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+                    accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
             Future<RecordMetadata> f2 =
-                    accumulator.append(tp2, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+                    accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
-            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.valueOf(id), "localhost", 0);
@@ -581,14 +607,14 @@ public class SenderTest {
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
             Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
-            responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
+            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
             client.respond(new ProduceResponse(responseMap));
             sender.run(time.milliseconds()); // split and reenqueue
             // The compression ratio should have been improved once.
             assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
                     CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
             sender.run(time.milliseconds()); // send produce request
-            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
             assertFalse("The future shouldn't have been done.", f1.isDone());
             assertFalse("The future shouldn't have been done.", f2.isDone());
             id = client.requests().peek().destination();
@@ -597,12 +623,13 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
-            responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
-            client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 0, false), new ProduceResponse(responseMap));
+            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
+            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isInTransaction()),
+                    new ProduceResponse(responseMap));
 
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f1.isDone());
-            assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue());
+            assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp).longValue());
             assertFalse("The future shouldn't have been done.", f2.isDone());
             assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
             sender.run(time.milliseconds()); // send produce request
@@ -612,14 +639,15 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
-            responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
-            client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 1, false), new ProduceResponse(responseMap));
+            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
+            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isInTransaction()),
+                    new ProduceResponse(responseMap));
 
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f2.isDone());
-            assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue());
+            assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
-            assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp2).isEmpty());
+            assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
 
             assertTrue("There should be a split",
                     m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0);
@@ -713,4 +741,23 @@ public class SenderTest {
         sender.run(time.milliseconds());
     }
 
+    private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE);
+        sender.run(time.milliseconds());
+        sender.run(time.milliseconds());
+
+        prepareInitPidResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.hasProducerId());
+    }
+
+    private void prepareFindCoordinatorResponse(Errors error) {
+        client.prepareResponse(new FindCoordinatorResponse(error, cluster.nodes().get(0)));
+    }
+
+    private void prepareInitPidResponse(Errors error, long pid, short epoch) {
+        client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c621d53..9734f59 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -491,18 +490,18 @@ public class MemoryRecordsBuilderTest {
     }
 
     @Test
-    public void shouldThrowKafkaExceptionOnBuildWhenAborted() throws Exception {
+    public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(128);
         buffer.position(bufferOffset);
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
-                                                                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                                                                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
+                RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.abort();
         try {
             builder.build();
             fail("Should have thrown KafkaException");
-        } catch (KafkaException e) {
+        } catch (IllegalStateException e) {
             // ok
         }
     }