You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/01 04:33:40 UTC
kafka git commit: KAFKA-5340;
Batch splitting should preserve magic and transactional flag
Repository: kafka
Updated Branches:
refs/heads/trunk 6360e04e7 -> e4a6b50de
KAFKA-5340; Batch splitting should preserve magic and transactional flag
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Jiangjie Qin <be...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #3162 from hachikuji/KAFKA-5340
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4a6b50d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4a6b50d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4a6b50d
Branch: refs/heads/trunk
Commit: e4a6b50deca8fabc9880c6764334bfaa830a6d5e
Parents: 6360e04
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 31 21:31:52 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 31 21:31:52 2017 -0700
----------------------------------------------------------------------
.../producer/internals/ProducerBatch.java | 30 +++++--
.../producer/internals/RecordAccumulator.java | 13 +--
.../clients/producer/internals/Sender.java | 4 +-
.../producer/internals/TransactionManager.java | 9 +++
.../kafka/common/record/MemoryRecords.java | 13 +--
.../common/record/MemoryRecordsBuilder.java | 7 +-
.../producer/internals/ProducerBatchTest.java | 43 +++++++++-
.../internals/RecordAccumulatorTest.java | 2 +-
.../clients/producer/internals/SenderTest.java | 85 +++++++++++++++-----
.../common/record/MemoryRecordsBuilderTest.java | 9 +--
10 files changed, 162 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 974e230..c7253a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
@@ -43,6 +44,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
@@ -179,23 +181,29 @@ public final class ProducerBatch {
public Deque<ProducerBatch> split(int splitBatchSize) {
Deque<ProducerBatch> batches = new ArrayDeque<>();
MemoryRecords memoryRecords = recordsBuilder.build();
+
Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();
if (!recordBatchIter.hasNext())
throw new IllegalStateException("Cannot split an empty producer batch.");
+
RecordBatch recordBatch = recordBatchIter.next();
+ if (recordBatch.magic() < MAGIC_VALUE_V2 && !recordBatch.isCompressed())
+ throw new IllegalArgumentException("Batch splitting cannot be used with non-compressed messages " +
+ "with version v0 and v1");
+
if (recordBatchIter.hasNext())
- throw new IllegalStateException("A producer batch should only have one record batch.");
+ throw new IllegalArgumentException("A producer batch should only have one record batch.");
Iterator<Thunk> thunkIter = thunks.iterator();
// We always allocate batch size because we are already splitting a big batch.
// And we also Retain the create time of the original batch.
ProducerBatch batch = null;
+
for (Record record : recordBatch) {
assert thunkIter.hasNext();
Thunk thunk = thunkIter.next();
- if (batch == null) {
+ if (batch == null)
batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
- }
// A newly created batch can always host the first message.
if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
@@ -204,6 +212,7 @@ public final class ProducerBatch {
batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
}
}
+
// Close the last batch and add it to the batch list after split.
if (batch != null)
batches.add(batch);
@@ -217,11 +226,19 @@ public final class ProducerBatch {
int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(),
record.key(), record.value(), record.headers()), batchSize);
ByteBuffer buffer = ByteBuffer.allocate(initialSize);
+
+ // Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional)
+ // for the newly created batch. This will be set when the batch is dequeued for sending (which is consistent
+ // with how normal batches are handled).
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
- TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional());
+ TimestampType.CREATE_TIME, 0L);
return new ProducerBatch(topicPartition, builder, this.createdMs, true);
}
+ public boolean isCompressed() {
+ return recordsBuilder.compressionType() != CompressionType.NONE;
+ }
+
/**
* A callback and the associated FutureRecordMetadata argument to pass to it.
*/
@@ -329,8 +346,9 @@ public final class ProducerBatch {
return recordsBuilder.isFull();
}
- public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) {
- recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence);
+ public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
+ recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
+ baseSequence, isTransactional);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 330c244..3f9f4b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -238,10 +238,7 @@ public final class RecordAccumulator {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
- boolean isTransactional = false;
- if (transactionManager != null)
- isTransactional = transactionManager.isInTransaction();
- return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L, isTransactional);
+ return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
}
/**
@@ -470,11 +467,17 @@ public final class RecordAccumulator {
break;
} else {
ProducerIdAndEpoch producerIdAndEpoch = null;
+ boolean isTransactional = false;
if (transactionManager != null) {
+ if (!transactionManager.ensurePartitionAdded(tp))
+ break;
+
producerIdAndEpoch = transactionManager.producerIdAndEpoch();
if (!producerIdAndEpoch.isValid())
// we cannot send the batch until we have refreshed the producer id
break;
+
+ isTransactional = transactionManager.isInTransaction();
}
ProducerBatch batch = deque.pollFirst();
@@ -488,7 +491,7 @@ public final class RecordAccumulator {
log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
batch.topicPartition, sequenceNumber);
- batch.setProducerState(producerIdAndEpoch, sequenceNumber);
+ batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional);
}
batch.close();
size += batch.sizeInBytes();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index f498f7d..01ff91a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
@@ -499,7 +500,8 @@ public class Sender implements Runnable {
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now) {
Errors error = response.error;
- if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1) {
+ if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
+ (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
// If the batch is too large, we split the batch and send the split batches again. We do not decrement
// the retry attempts in this case.
log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}",
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 30fff86..221816c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -232,6 +232,15 @@ public class TransactionManager {
return lastError;
}
+ public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
+ if (isInTransaction() && !partitionsInTransaction.contains(tp)) {
+ transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " +
+ "for partition " + tp + ", which hasn't been added to the transaction yet"));
+ return false;
+ }
+ return true;
+ }
+
public String transactionalId() {
return transactionalId;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 1d45635..46798cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -359,14 +359,6 @@ public class MemoryRecords extends AbstractRecords {
byte magic,
CompressionType compressionType,
TimestampType timestampType,
- long baseOffset) {
- return builder(buffer, magic, compressionType, timestampType, baseOffset, false);
- }
-
- public static MemoryRecordsBuilder builder(ByteBuffer buffer,
- byte magic,
- CompressionType compressionType,
- TimestampType timestampType,
long baseOffset,
long logAppendTime) {
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
@@ -378,13 +370,12 @@ public class MemoryRecords extends AbstractRecords {
byte magic,
CompressionType compressionType,
TimestampType timestampType,
- long baseOffset,
- boolean isTransactional) {
+ long baseOffset) {
long logAppendTime = RecordBatch.NO_TIMESTAMP;
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional,
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 66560ca..89d314d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -52,7 +52,6 @@ public class MemoryRecordsBuilder {
private final int initialPosition;
private final long baseOffset;
private final long logAppendTime;
- private final boolean isTransactional;
private final boolean isControlBatch;
private final int partitionLeaderEpoch;
private final int writeLimit;
@@ -60,6 +59,7 @@ public class MemoryRecordsBuilder {
private volatile float estimatedCompressionRatio;
private boolean appendStreamIsClosed = false;
+ private boolean isTransactional;
private long producerId;
private short producerEpoch;
private int baseSequence;
@@ -196,7 +196,7 @@ public class MemoryRecordsBuilder {
*/
public MemoryRecords build() {
if (aborted) {
- throw new KafkaException("Attempting to build an aborted record batch");
+ throw new IllegalStateException("Attempting to build an aborted record batch");
}
close();
return builtRecords;
@@ -235,7 +235,7 @@ public class MemoryRecordsBuilder {
}
}
- public void setProducerState(long producerId, short producerEpoch, int baseSequence) {
+ public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
if (isClosed()) {
// Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
// If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
@@ -246,6 +246,7 @@ public class MemoryRecordsBuilder {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.baseSequence = baseSequence;
+ this.isTransactional = isTransactional;
}
public void overrideLastOffset(long lastOffset) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index da93015..6d2d2f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -28,7 +28,11 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Deque;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -52,9 +56,9 @@ public class ProducerBatchTest {
@Test
public void testAppendedChecksumMagicV0AndV1() {
- for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) {
+ for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
- CompressionType.NONE, TimestampType.CREATE_TIME, 128);
+ CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
byte[] key = "hi".getBytes();
byte[] value = "there".getBytes();
@@ -67,6 +71,41 @@ public class ProducerBatchTest {
}
}
+ @Test
+ public void testSplitPreservesMagicAndCompressionType() {
+ for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2)) {
+ for (CompressionType compressionType : CompressionType.values()) {
+ if (compressionType == CompressionType.NONE && magic < MAGIC_VALUE_V2)
+ continue;
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic,
+ compressionType, TimestampType.CREATE_TIME, 0L);
+
+ ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
+ while (true) {
+ FutureRecordMetadata future = batch.tryAppend(now, "hi".getBytes(), "there".getBytes(),
+ Record.EMPTY_HEADERS, null, now);
+ if (future == null)
+ break;
+ }
+
+ Deque<ProducerBatch> batches = batch.split(512);
+ assertTrue(batches.size() >= 2);
+
+ for (ProducerBatch splitProducerBatch : batches) {
+ assertEquals(magic, splitProducerBatch.magic());
+ assertTrue(splitProducerBatch.isSplitBatch());
+
+ for (RecordBatch splitBatch : splitProducerBatch.records().batches()) {
+ assertEquals(magic, splitBatch.magic());
+ assertEquals(0L, splitBatch.baseOffset());
+ assertEquals(compressionType, splitBatch.compressionType());
+ }
+ }
+ }
+ }
+ }
+
/**
* A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create
* time is interpreted correctly as not expired when the linger time is larger than the difference
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index e079f2a..2875ba2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -584,7 +584,7 @@ public class RecordAccumulatorTest {
assertNotNull(future1);
assertNotNull(future2);
batch.close();
- // Enqueue the batch to the accumulator so that as if the batch was created by the accumulator.
+ // Enqueue the batch to the accumulator as if the batch was created by the accumulator.
accum.reenqueue(batch, now);
time.sleep(101L);
// Drain the batch.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c1c5a2e..927a937 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -42,7 +42,9 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
@@ -96,6 +98,7 @@ public class SenderTest {
@Before
public void setup() {
+ client.setNode(cluster.nodes().get(0));
setupWithTransactionState(null);
}
@@ -548,17 +551,41 @@ public class SenderTest {
}
@Test
- public void testSplitBatchAndSend() throws Exception {
+ public void testIdempotentSplitBatchAndSend() throws Exception {
+ TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
+ TransactionManager txnManager = new TransactionManager();
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+ txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
+ testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
+ }
+
+ @Test
+ public void testTransactionalSplitBatchAndSend() throws Exception {
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+ TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
+ TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000);
+
+ setupWithTransactionState(txnManager);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+
+ txnManager.beginTransaction();
+ txnManager.maybeAddPartitionToTransaction(tp);
+ client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+ sender.run(time.milliseconds());
+
+ testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
+ }
+
+ private void testSplitBatchAndSend(TransactionManager txnManager,
+ ProducerIdAndEpoch producerIdAndEpoch,
+ TopicPartition tp) throws Exception {
int maxRetries = 1;
- String topic = "testSplitBatchAndSend";
+ String topic = tp.topic();
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
Metrics m = new Metrics();
- TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0);
- ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
- txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
- new ApiVersions(), txnManager);
+ new ApiVersions(), txnManager);
try {
Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
@@ -566,14 +593,13 @@ public class SenderTest {
Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
// Send the first message.
- final TopicPartition tp2 = new TopicPartition(topic, 1);
Future<RecordMetadata> f1 =
- accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+ accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> f2 =
- accumulator.append(tp2, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+ accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
- assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+ assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
String id = client.requests().peek().destination();
assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
Node node = new Node(Integer.valueOf(id), "localhost", 0);
@@ -581,14 +607,14 @@ public class SenderTest {
assertTrue("Client ready status should be true", client.isReady(node, 0L));
Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
- responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
+ responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
client.respond(new ProduceResponse(responseMap));
sender.run(time.milliseconds()); // split and reenqueue
// The compression ratio should have been improved once.
assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
sender.run(time.milliseconds()); // send produce request
- assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+ assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
assertFalse("The future shouldn't have been done.", f1.isDone());
assertFalse("The future shouldn't have been done.", f2.isDone());
id = client.requests().peek().destination();
@@ -597,12 +623,13 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertTrue("Client ready status should be true", client.isReady(node, 0L));
- responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
- client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 0, false), new ProduceResponse(responseMap));
+ responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
+ client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isInTransaction()),
+ new ProduceResponse(responseMap));
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f1.isDone());
- assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue());
+ assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp).longValue());
assertFalse("The future shouldn't have been done.", f2.isDone());
assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
sender.run(time.milliseconds()); // send produce request
@@ -612,14 +639,15 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertTrue("Client ready status should be true", client.isReady(node, 0L));
- responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
- client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 1, false), new ProduceResponse(responseMap));
+ responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
+ client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isInTransaction()),
+ new ProduceResponse(responseMap));
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f2.isDone());
- assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue());
+ assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
- assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp2).isEmpty());
+ assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
assertTrue("There should be a split",
m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0);
@@ -713,4 +741,23 @@ public class SenderTest {
sender.run(time.milliseconds());
}
+ private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
+ transactionManager.initializeTransactions();
+ prepareFindCoordinatorResponse(Errors.NONE);
+ sender.run(time.milliseconds());
+ sender.run(time.milliseconds());
+
+ prepareInitPidResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
+ sender.run(time.milliseconds());
+ assertTrue(transactionManager.hasProducerId());
+ }
+
+ private void prepareFindCoordinatorResponse(Errors error) {
+ client.prepareResponse(new FindCoordinatorResponse(error, cluster.nodes().get(0)));
+ }
+
+ private void prepareInitPidResponse(Errors error, long pid, short epoch) {
+ client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e4a6b50d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c621d53..9734f59 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.record;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -491,18 +490,18 @@ public class MemoryRecordsBuilderTest {
}
@Test
- public void shouldThrowKafkaExceptionOnBuildWhenAborted() throws Exception {
+ public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
- TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.abort();
try {
builder.build();
fail("Should have thrown KafkaException");
- } catch (KafkaException e) {
+ } catch (IllegalStateException e) {
// ok
}
}