You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/05/22 00:31:36 UTC
kafka git commit: KAFKA-3995;
KIP-126 Allow KafkaProducer to split and resend oversized batches
Repository: kafka
Updated Branches:
refs/heads/trunk 1c7fdd284 -> 7fad45557
KAFKA-3995; KIP-126 Allow KafkaProducer to split and resend oversized batches
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Joel Koshy <jj...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2638 from becketqin/KAFKA-3995
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7fad4555
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7fad4555
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7fad4555
Branch: refs/heads/trunk
Commit: 7fad45557e4cb7b345f34cec32f910b437c59bc2
Parents: 1c7fdd2
Author: Jiangjie Qin <be...@gmail.com>
Authored: Sun May 21 17:31:31 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sun May 21 17:31:31 2017 -0700
----------------------------------------------------------------------
checkstyle/checkstyle.xml | 2 +-
.../internals/FutureRecordMetadata.java | 34 +++-
.../producer/internals/ProducerBatch.java | 142 ++++++++++++-
.../producer/internals/RecordAccumulator.java | 37 +++-
.../clients/producer/internals/Sender.java | 29 ++-
.../kafka/common/record/AbstractRecords.java | 4 +
.../record/CompressionRatioEstimator.java | 111 ++++++++++
.../kafka/common/record/CompressionType.java | 6 +-
.../kafka/common/record/DefaultRecord.java | 8 +-
.../kafka/common/record/DefaultRecordBatch.java | 8 +
.../common/record/MemoryRecordsBuilder.java | 78 ++++---
.../internals/RecordAccumulatorTest.java | 203 ++++++++++++++++++-
.../clients/producer/internals/SenderTest.java | 90 ++++++++
.../common/record/MemoryRecordsBuilderTest.java | 9 +-
14 files changed, 701 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 9f9e9ae..cf57a50 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -105,7 +105,7 @@
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
- <property name="max" value="15"/>
+ <property name="max" value="17"/>
</module>
<module name="BooleanExpressionComplexity">
<!-- default is 3 -->
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index f8b38e8..1de965f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -34,6 +34,7 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
private final long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
+ private volatile FutureRecordMetadata nextRecordMetadata = null;
public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
long checksum, int serializedKeySize, int serializedValueSize) {
@@ -58,25 +59,54 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
this.result.await();
+ if (nextRecordMetadata != null)
+ return nextRecordMetadata.get();
return valueOrError();
}
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ // Handle overflow.
+ long now = System.currentTimeMillis();
+ long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
boolean occurred = this.result.await(timeout, unit);
+ if (nextRecordMetadata != null)
+ return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (!occurred)
throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
return valueOrError();
}
+ /**
+ * This method is used when we have to split a large batch in smaller ones. A chained metadata will allow the
+ * future that has already returned to the users to wait on the newly created split batches even after the
+ * old big batch has been deemed as done.
+ */
+ void chain(FutureRecordMetadata futureRecordMetadata) {
+ if (nextRecordMetadata == null)
+ nextRecordMetadata = futureRecordMetadata;
+ else
+ nextRecordMetadata.chain(futureRecordMetadata);
+ }
+
RecordMetadata valueOrError() throws ExecutionException {
if (this.result.error() != null)
throw new ExecutionException(this.result.error());
else
return value();
}
-
+
+ long checksum() {
+ return this.checksum;
+ }
+
+ long relativeOffset() {
+ return this.relativeOffset;
+ }
+
RecordMetadata value() {
+ if (nextRecordMetadata != null)
+ return nextRecordMetadata.value();
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
timestamp(), this.checksum, this.serializedKeySize, this.serializedValueSize);
}
@@ -87,6 +117,8 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
@Override
public boolean isDone() {
+ if (nextRecordMetadata != null)
+ return nextRecordMetadata.isDone();
return this.result.completed();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 1c078c8..cdf85ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -16,15 +16,26 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ProduceResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +45,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
+
+
/**
* A batch of records that is or will be sent.
*
@@ -51,6 +65,7 @@ public final class ProducerBatch {
private final MemoryRecordsBuilder recordsBuilder;
private final AtomicInteger attempts = new AtomicInteger(0);
+ private final boolean isSplitBatch;
int recordCount;
int maxRecordSize;
private long lastAttemptMs;
@@ -61,6 +76,10 @@ public final class ProducerBatch {
private boolean retry;
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
+ this(tp, recordsBuilder, now, false);
+ }
+
+ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now, boolean isSplitBatch) {
this.createdMs = now;
this.lastAttemptMs = now;
this.recordsBuilder = recordsBuilder;
@@ -69,6 +88,10 @@ public final class ProducerBatch {
this.produceFuture = new ProduceRequestResult(topicPartition);
this.completed = new AtomicBoolean();
this.retry = false;
+ this.isSplitBatch = isSplitBatch;
+ float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
+ recordsBuilder.compressionType());
+ recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
/**
@@ -87,14 +110,39 @@ public final class ProducerBatch {
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
- if (callback != null)
- thunks.add(new Thunk(callback, future));
+ // we have to keep every future returned to the users in case the batch needs to be
+ // split to several new batches and resent.
+ thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
/**
+ + * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
+ + * @return true if the record has been successfully appended, false otherwise.
+ + */
+ private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
+ if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
+ return false;
+ } else {
+ // No need to get the CRC.
+ this.recordsBuilder.append(timestamp, key, value);
+ this.maxRecordSize = Math.max(this.maxRecordSize,
+ AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
+ FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
+ timestamp, thunk.future.checksum(),
+ key == null ? -1 : key.remaining(),
+ value == null ? -1 : value.remaining());
+ // Chain the future to the original thunk.
+ thunk.future.chain(future);
+ this.thunks.add(thunk);
+ this.recordCount++;
+ return true;
+ }
+ }
+
+ /**
* Complete the request.
*
* @param baseOffset The base offset of the messages assigned by the server
@@ -116,9 +164,11 @@ public final class ProducerBatch {
try {
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
- thunk.callback.onCompletion(metadata, null);
+ if (thunk.callback != null)
+ thunk.callback.onCompletion(metadata, null);
} else {
- thunk.callback.onCompletion(null, exception);
+ if (thunk.callback != null)
+ thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
@@ -128,6 +178,71 @@ public final class ProducerBatch {
produceFuture.done();
}
+ public Deque<ProducerBatch> split(int splitBatchSize) {
+ Deque<ProducerBatch> batches = new ArrayDeque<>();
+ MemoryRecords memoryRecords = recordsBuilder.build();
+ Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();
+ if (!recordBatchIter.hasNext())
+ throw new IllegalStateException("Cannot split an empty producer batch.");
+ RecordBatch recordBatch = recordBatchIter.next();
+ if (recordBatchIter.hasNext())
+ throw new IllegalStateException("A producer batch should only have one record batch.");
+
+ Iterator<Thunk> thunkIter = thunks.iterator();
+ // We always allocate batch size because we are already splitting a big batch.
+ // And we also Retain the create time of the original batch.
+ ProducerBatch batch = null;
+ for (Record record : recordBatch) {
+ assert thunkIter.hasNext();
+ Thunk thunk = thunkIter.next();
+ if (batch == null) {
+ batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
+ record, splitBatchSize, this.createdMs);
+ }
+
+ // A newly created batch can always host the first message.
+ if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
+ batches.add(batch);
+ batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
+ record, splitBatchSize, this.createdMs);
+ batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
+ }
+ }
+ // Close the last batch and add it to the batch list after split.
+ if (batch != null)
+ batches.add(batch);
+
+ produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new RecordBatchTooLargeException());
+ produceFuture.done();
+ return batches;
+ }
+
+ private ProducerBatch createBatchOffAccumulatorForRecord(TopicPartition tp,
+ CompressionType compressionType,
+ Record record,
+ int batchSize,
+ long createdMs) {
+ int initialSize = Math.max(Records.LOG_OVERHEAD + AbstractRecords.sizeInBytesUpperBound(magic(),
+ record.key(),
+ record.value(),
+ record.headers()),
+ batchSize);
+ return createBatchOffAccumulator(tp, compressionType, initialSize, createdMs);
+ }
+
+ // package private for testing purpose.
+ static ProducerBatch createBatchOffAccumulator(TopicPartition tp,
+ CompressionType compressionType,
+ int batchSize,
+ long createdMs) {
+ ByteBuffer buffer = ByteBuffer.allocate(batchSize);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+ compressionType,
+ TimestampType.CREATE_TIME,
+ batchSize);
+ return new ProducerBatch(tp, builder, createdMs, true);
+ }
+
/**
* A callback and the associated FutureRecordMetadata argument to pass to it.
*/
@@ -135,7 +250,7 @@ public final class ProducerBatch {
final Callback callback;
final FutureRecordMetadata future;
- public Thunk(Callback callback, FutureRecordMetadata future) {
+ Thunk(Callback callback, FutureRecordMetadata future) {
this.callback = callback;
this.future = future;
}
@@ -155,7 +270,7 @@ public final class ProducerBatch {
* This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out.
* {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks.
*/
- public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
+ boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
@@ -177,7 +292,7 @@ public final class ProducerBatch {
void expirationDone() {
if (expiryErrorMessage == null)
throw new IllegalStateException("Batch has not expired");
- this.done(-1L, RecordBatch.NO_TIMESTAMP,
+ this.done(-1L, NO_TIMESTAMP,
new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
}
@@ -208,6 +323,10 @@ public final class ProducerBatch {
this.drainedMs = Math.max(drainedMs, nowMs);
}
+ boolean isSplitBatch() {
+ return isSplitBatch;
+ }
+
/**
* Returns if the batch is been retried for sending to kafka
*/
@@ -223,8 +342,8 @@ public final class ProducerBatch {
return recordsBuilder.sizeInBytes();
}
- public double compressionRate() {
- return recordsBuilder.compressionRate();
+ public double compressionRatio() {
+ return recordsBuilder.compressionRatio();
}
public boolean isFull() {
@@ -245,6 +364,11 @@ public final class ProducerBatch {
public void close() {
recordsBuilder.close();
+ if (!recordsBuilder.isControlBatch()) {
+ CompressionRatioEstimator.updateEstimation(topicPartition.topic(),
+ recordsBuilder.compressionType(),
+ (float) recordsBuilder.compressionRatio());
+ }
}
public void abort() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5b8fb96..e1f04a8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
@@ -325,6 +326,30 @@ public final class RecordAccumulator {
}
/**
+ * Split the big batch that has been rejected and reenqueue the split batches in to the accumulator.
+ * @return the number of split batches.
+ */
+ public int splitAndReenqueue(ProducerBatch bigBatch) {
+ // Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
+ // is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
+ // the split doesn't happen too often.
+ CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
+ Math.max(1.0f, (float) bigBatch.compressionRatio()));
+ Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
+ int numSplitBatches = dq.size();
+ Deque<ProducerBatch> partitionDequeue = getOrCreateDeque(bigBatch.topicPartition);
+ while (!dq.isEmpty()) {
+ ProducerBatch batch = dq.pollLast();
+ incomplete.add(batch);
+ // We treat the newly split batches as if they are not even tried.
+ synchronized (partitionDequeue) {
+ partitionDequeue.addFirst(batch);
+ }
+ }
+ return numSplitBatches;
+ }
+
+ /**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
@@ -506,7 +531,17 @@ public final class RecordAccumulator {
*/
public void deallocate(ProducerBatch batch) {
incomplete.remove(batch);
- free.deallocate(batch.buffer(), batch.initialCapacity());
+ // Only deallocate the batch if it is not a split batch because split batch are allocated aside the
+ // buffer pool.
+ if (!batch.isSplitBatch())
+ free.deallocate(batch.buffer(), batch.initialCapacity());
+ }
+
+ /**
+ * Package private for unit test. Get the buffer pool remaining size in bytes.
+ */
+ long bufferPoolAvailableMemory() {
+ return free.availableMemory();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8dea9c6..4c3b99d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -185,7 +185,7 @@ public class Sender implements Runnable {
/**
* Run a single iteration of sending
- *
+ *
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
@@ -478,7 +478,7 @@ public class Sender implements Runnable {
/**
* Complete or retry the given batch of records.
- *
+ *
* @param batch The record batch
* @param response The produce response
* @param correlationId The correlation id for the request
@@ -487,7 +487,18 @@ public class Sender implements Runnable {
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now) {
Errors error = response.error;
- if (error != Errors.NONE) {
+ if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1) {
+ // If the batch is too large, we split the batch and send the split batches again. We do not decrement
+ // the retry attempts in this case.
+ log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}",
+ correlationId,
+ batch.topicPartition,
+ this.retries - batch.attempts(),
+ error);
+ this.accumulator.splitAndReenqueue(batch);
+ this.accumulator.deallocate(batch);
+ this.sensors.recordBatchSplit();
+ } else if (error != Errors.NONE) {
if (canRetry(batch, error)) {
log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
@@ -656,6 +667,7 @@ public class Sender implements Runnable {
public final Sensor compressionRateSensor;
public final Sensor maxRecordSizeSensor;
public final Sensor produceThrottleTimeSensor;
+ public final Sensor batchSplitSensor;
public SenderMetrics(Metrics metrics) {
this.metrics = metrics;
@@ -721,6 +733,10 @@ public class Sender implements Runnable {
return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
}
});
+
+ this.batchSplitSensor = metrics.sensor("batch-split-rate");
+ m = metrics.metricName("batch-split-rate", metricGrpName, "The rate of record batch split");
+ this.batchSplitSensor.add(m, new Rate());
}
private void maybeRegisterTopicMetrics(String topic) {
@@ -780,12 +796,12 @@ public class Sender implements Runnable {
// per-topic compression rate
String topicCompressionRateName = "topic." + topic + ".compression-rate";
Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
- topicCompressionRate.record(batch.compressionRate());
+ topicCompressionRate.record(batch.compressionRatio());
// global metrics
this.batchSizeSensor.record(batch.sizeInBytes(), now);
this.queueTimeSensor.record(batch.queueTimeMs(), now);
- this.compressionRateSensor.record(batch.compressionRate());
+ this.compressionRateSensor.record(batch.compressionRatio());
this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
records += batch.recordCount;
}
@@ -826,6 +842,9 @@ public class Sender implements Runnable {
this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
}
+ void recordBatchSplit() {
+ this.batchSplitSensor.record();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index cfda8a4..2771ab7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -165,6 +165,10 @@ public abstract class AbstractRecords implements Records {
}
public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value, Header[] headers) {
+ return sizeInBytesUpperBound(magic, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+ }
+
+ public static int sizeInBytesUpperBound(byte magic, ByteBuffer key, ByteBuffer value, Header[] headers) {
if (magic >= RecordBatch.MAGIC_VALUE_V2)
return DefaultRecordBatch.batchSizeUpperBound(key, value, headers);
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
new file mode 100644
index 0000000..7f11784
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ * This class help estimate the compression ratio for each topic and compression type combination.
+ */
+public class CompressionRatioEstimator {
+ // The constant speed to increase compression ratio when a batch compresses better than expected.
+ public static final float COMPRESSION_RATIO_IMPROVING_STEP = 0.005f;
+ // The minimum speed to decrease compression ratio when a batch compresses worse than expected.
+ public static final float COMPRESSION_RATIO_DETERIORATE_STEP = 0.05f;
+ private static final ConcurrentMap<String, float[]> COMPRESSION_RATIO = new ConcurrentHashMap<>();
+
+ /**
+ * Update the compression ratio estimation for a topic and compression type.
+ *
+ * @param topic the topic to update compression ratio estimation.
+ * @param type the compression type.
+ * @param observedRatio the observed compression ratio.
+ * @return the compression ratio estimation after the update.
+ */
+ public static float updateEstimation(String topic, CompressionType type, float observedRatio) {
+ float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+ float currentEstimation = compressionRatioForTopic[type.id];
+ synchronized (compressionRatioForTopic) {
+ if (observedRatio > currentEstimation)
+ compressionRatioForTopic[type.id] = Math.max(currentEstimation + COMPRESSION_RATIO_DETERIORATE_STEP, observedRatio);
+ else if (observedRatio < currentEstimation) {
+ compressionRatioForTopic[type.id] = currentEstimation - COMPRESSION_RATIO_IMPROVING_STEP;
+ }
+ }
+ return compressionRatioForTopic[type.id];
+ }
+
+ /**
+ * Get the compression ratio estimation for a topic and compression type.
+ */
+ public static float estimation(String topic, CompressionType type) {
+ float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+ return compressionRatioForTopic[type.id];
+ }
+
+ /**
+ * Reset the compression ratio estimation to the initial values for a topic.
+ */
+ public static void resetEstimation(String topic) {
+ float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+ synchronized (compressionRatioForTopic) {
+ for (CompressionType type : CompressionType.values()) {
+ compressionRatioForTopic[type.id] = type.rate;
+ }
+ }
+ }
+
+ /**
+ * Remove the compression ratio estimation for a topic.
+ */
+ public static void removeEstimation(String topic) {
+ COMPRESSION_RATIO.remove(topic);
+ }
+
+ /**
+ * Set the compression estimation for a topic compression type combination. This method is for unit test purpose.
+ */
+ public static void setEstimation(String topic, CompressionType type, float ratio) {
+ float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+ synchronized (compressionRatioForTopic) {
+ compressionRatioForTopic[type.id] = ratio;
+ }
+ }
+
+ private static float[] getAndCreateEstimationIfAbsent(String topic) {
+ float[] compressionRatioForTopic = COMPRESSION_RATIO.get(topic);
+ if (compressionRatioForTopic == null) {
+ compressionRatioForTopic = initialCompressionRatio();
+ float[] existingCompressionRatio = COMPRESSION_RATIO.putIfAbsent(topic, compressionRatioForTopic);
+ // Someone created the compression ratio array before us, use it.
+ if (existingCompressionRatio != null)
+ return existingCompressionRatio;
+ }
+ return compressionRatioForTopic;
+ }
+
+ private static float[] initialCompressionRatio() {
+ float[] compressionRatio = new float[CompressionType.values().length];
+ for (CompressionType type : CompressionType.values()) {
+ compressionRatio[type.id] = type.rate;
+ }
+ return compressionRatio;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index a78c5a2..15b5958 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -44,7 +44,7 @@ public enum CompressionType {
}
},
- GZIP(1, "gzip", 0.5f) {
+ GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
try {
@@ -64,7 +64,7 @@ public enum CompressionType {
}
},
- SNAPPY(2, "snappy", 0.5f) {
+ SNAPPY(2, "snappy", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
try {
@@ -84,7 +84,7 @@ public enum CompressionType {
}
},
- LZ4(3, "lz4", 0.5f) {
+ LZ4(3, "lz4", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 669c75d..37f92d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -494,8 +494,12 @@ public class DefaultRecord implements Record {
}
static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
- int keySize = key == null ? -1 : key.length;
- int valueSize = value == null ? -1 : value.length;
+ return recordSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+ }
+
+ static int recordSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
+ int keySize = key == null ? -1 : key.remaining();
+ int valueSize = value == null ? -1 : value.remaining();
return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 74bd3c0..589e67c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import org.apache.kafka.common.utils.Utils;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@@ -440,6 +441,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
* Get an upper bound on the size of a batch with only a single record using a given key and value.
*/
static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
+ return batchSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+ }
+
+ /**
+ * Get an upper bound on the size of a batch with only a single record using a given key and value.
+ */
+ static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 6f90fac..42ae0f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -37,22 +37,9 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
* This will release resources like compression buffers that can be relatively large (64 KB for LZ4).
*/
public class MemoryRecordsBuilder {
- private static final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
private static final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
- private static final float[] TYPE_TO_RATE;
-
- static {
- int maxTypeId = -1;
- for (CompressionType type : CompressionType.values())
- maxTypeId = Math.max(maxTypeId, type.id);
- TYPE_TO_RATE = new float[maxTypeId + 1];
- for (CompressionType type : CompressionType.values()) {
- TYPE_TO_RATE[type.id] = type.rate;
- }
- }
-
private final TimestampType timestampType;
private final CompressionType compressionType;
// Used to append records, may compress data on the fly
@@ -71,13 +58,15 @@ public class MemoryRecordsBuilder {
private final int writeLimit;
private final int initialCapacity;
+ private volatile float estimatedCompressionRatio;
+
private boolean appendStreamIsClosed = false;
private long producerId;
private short producerEpoch;
private int baseSequence;
private long writtenUncompressed = 0;
private int numRecords = 0;
- private float compressionRate = 1;
+ private float actualCompressionRatio = 1;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
private long offsetOfMaxTimestamp = -1;
private Long lastOffset = null;
@@ -134,7 +123,7 @@ public class MemoryRecordsBuilder {
this.initPos = buffer.position();
this.numRecords = 0;
this.writtenUncompressed = 0;
- this.compressionRate = 1;
+ this.actualCompressionRatio = 1;
this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
@@ -167,8 +156,16 @@ public class MemoryRecordsBuilder {
return initialCapacity;
}
- public double compressionRate() {
- return compressionRate;
+ public double compressionRatio() {
+ return actualCompressionRatio;
+ }
+
+ public CompressionType compressionType() {
+ return compressionType;
+ }
+
+ public boolean isControlBatch() {
+ return isControlBatch;
}
/**
@@ -284,9 +281,9 @@ public class MemoryRecordsBuilder {
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
- writeDefaultBatchHeader();
+ this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.writtenUncompressed;
else if (compressionType != CompressionType.NONE)
- writeLegacyCompressedWrapperHeader();
+ this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.writtenUncompressed;
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
@@ -295,12 +292,17 @@ public class MemoryRecordsBuilder {
}
}
- private void writeDefaultBatchHeader() {
+ /**
+ * Write the header to the default batch.
+ * @return the written compressed bytes.
+ */
+ private int writeDefaultBatchHeader() {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
buffer.position(initPos);
int size = pos - initPos;
+ int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
int offsetDelta = (int) (lastOffset - baseOffset);
final long baseTimestamp;
@@ -318,9 +320,14 @@ public class MemoryRecordsBuilder {
partitionLeaderEpoch, numRecords);
buffer.position(pos);
+ return writtenCompressed;
}
- private void writeLegacyCompressedWrapperHeader() {
+ /**
+ * Write the header to the legacy batch.
+ * @return the written compressed bytes.
+ */
+ private int writeLegacyCompressedWrapperHeader() {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
@@ -334,11 +341,7 @@ public class MemoryRecordsBuilder {
LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
buffer.position(pos);
-
- // update the compression ratio
- this.compressionRate = (float) writtenCompressed / this.writtenUncompressed;
- TYPE_TO_RATE[compressionType.id] = TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_DAMPING_FACTOR +
- compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
+ return writtenCompressed;
}
private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
@@ -440,7 +443,7 @@ public class MemoryRecordsBuilder {
public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
return append(timestamp, key, value, Record.EMPTY_HEADERS);
}
-
+
/**
* Append a new record at the next sequential offset.
* @param timestamp The record timestamp
@@ -636,11 +639,25 @@ public class MemoryRecordsBuilder {
return buffer().position();
} else {
// estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
- return (int) (writtenUncompressed * TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ return (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
}
}
/**
+ * Set the estimated compression ratio for the memory records builder.
+ */
+ public void setEstimatedCompressionRatio(float estimatedCompressionRatio) {
+ this.estimatedCompressionRatio = estimatedCompressionRatio;
+ }
+
+ /**
+ * Check if we have room for a new record containing the given key/value pair
+ */
+ public boolean hasRoomFor(long timestamp, byte[] key, byte[] value) {
+ return hasRoomFor(timestamp, wrapNullable(key), wrapNullable(value));
+ }
+
+ /**
* Check if we have room for a new record containing the given key/value pair
*
* Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
@@ -652,7 +669,7 @@ public class MemoryRecordsBuilder {
* the checking should be based on the capacity of the initialized buffer rather than the write limit in order
* to accept this single record.
*/
- public boolean hasRoomFor(long timestamp, byte[] key, byte[] value) {
+ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value) {
if (isFull())
return false;
@@ -662,9 +679,10 @@ public class MemoryRecordsBuilder {
} else {
int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp;
- recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value);
+ recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, Record.EMPTY_HEADERS);
}
+ // Be conservative and not take compression of the new record into consideration.
return numRecords == 0 ?
this.initialCapacity >= recordSize :
this.writeLimit >= estimatedBytesWritten() + recordSize;
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index af599ca..b9675c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
@@ -28,6 +31,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.DefaultRecord;
@@ -55,6 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -311,7 +316,7 @@ public class RecordAccumulatorTest {
assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size());
assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition);
}
-
+
@Test
public void testFlush() throws Exception {
long lingerMs = Long.MAX_VALUE;
@@ -321,16 +326,16 @@ public class RecordAccumulatorTest {
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
-
+
accum.beginFlush();
result = accum.ready(cluster, time.milliseconds());
-
+
// drain and deallocate all batches
Map<Integer, List<ProducerBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
for (List<ProducerBatch> batches: results.values())
for (ProducerBatch batch: batches)
accum.deallocate(batch);
-
+
// should be complete with no unsent records.
accum.awaitFlushCompletion();
assertFalse(accum.hasUnsent());
@@ -552,6 +557,196 @@ public class RecordAccumulatorTest {
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
}
+ @Test
+ public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
+ long now = time.milliseconds();
+ RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time,
+ new ApiVersions(), null);
+ // Create a big batch
+ ProducerBatch batch = ProducerBatch.createBatchOffAccumulator(tp1, CompressionType.NONE, 4096, now);
+ byte[] value = new byte[1024];
+ final AtomicInteger acked = new AtomicInteger(0);
+ Callback cb = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ acked.incrementAndGet();
+ }
+ };
+ // Append two messages so the batch is too big.
+ Future<RecordMetadata> future1 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now);
+ Future<RecordMetadata> future2 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now);
+ assertNotNull(future1);
+ assertNotNull(future2);
+ batch.close();
+ // Enqueue the batch to the accumulator so that as if the batch was created by the accumulator.
+ accum.reenqueue(batch, now);
+ time.sleep(101L);
+ // Drain the batch.
+ RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+ assertTrue("The batch should be ready", result.readyNodes.size() > 0);
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertEquals("Only node1 should be drained", 1, drained.size());
+ assertEquals("Only one batch should be drained", 1, drained.get(node1.id()).size());
+ // Split and reenqueue the batch.
+ accum.splitAndReenqueue(drained.get(node1.id()).get(0));
+ time.sleep(101L);
+
+ drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertFalse(drained.isEmpty());
+ assertFalse(drained.get(node1.id()).isEmpty());
+ drained.get(node1.id()).get(0).done(acked.get(), 100L, null);
+ assertEquals("The first message should have been acked.", 1, acked.get());
+ assertTrue(future1.isDone());
+ assertEquals(0, future1.get().offset());
+
+ drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertFalse(drained.isEmpty());
+ assertFalse(drained.get(node1.id()).isEmpty());
+ drained.get(node1.id()).get(0).done(acked.get(), 100L, null);
+ assertEquals("Both message should have been acked.", 2, acked.get());
+ assertTrue(future2.isDone());
+ assertEquals(1, future2.get().offset());
+ }
+
+ @Test
+ public void testSplitBatchOffAccumulator() throws InterruptedException {
+ long seed = System.currentTimeMillis();
+ final int batchSize = 1024;
+ final int bufferCapacity = 3 * 1024;
+
+ // First set the compression ratio estimation to be good.
+ CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
+ RecordAccumulator accum = new RecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L, 100L,
+ metrics, time, new ApiVersions(), null);
+ int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20);
+ assertTrue("There should be some split batches", numSplitBatches > 0);
+ // Drain all the split batches.
+ RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+ for (int i = 0; i < numSplitBatches; i++) {
+ Map<Integer, List<ProducerBatch>> drained =
+ accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertFalse(drained.isEmpty());
+ assertFalse(drained.get(node1.id()).isEmpty());
+ }
+ assertTrue("All the batches should have been drained.",
+ accum.ready(cluster, time.milliseconds()).readyNodes.isEmpty());
+ assertEquals("The split batches should be allocated off the accumulator",
+ bufferCapacity, accum.bufferPoolAvailableMemory());
+ }
+
+ @Test
+ public void testSplitFrequency() throws InterruptedException {
+ long seed = System.currentTimeMillis();
+ Random random = new Random();
+ random.setSeed(seed);
+ final int batchSize = 1024;
+ final int numMessages = 1000;
+
+ RecordAccumulator accum = new RecordAccumulator(batchSize, 3 * 1024, CompressionType.GZIP, 10, 100L,
+ metrics, time, new ApiVersions(), null);
+ // Adjust the high and low compression ratio message percentage
+ for (int goodCompRatioPercentage = 1; goodCompRatioPercentage < 100; goodCompRatioPercentage++) {
+ int numSplit = 0;
+ int numBatches = 0;
+ CompressionRatioEstimator.resetEstimation(topic);
+ for (int i = 0; i < numMessages; i++) {
+ int dice = random.nextInt(100);
+ byte[] value = (dice < goodCompRatioPercentage) ?
+ bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100);
+ accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0);
+ BatchDrainedResult result = completeOrSplitBatches(accum, batchSize);
+ numSplit += result.numSplit;
+ numBatches += result.numBatches;
+ }
+ time.sleep(10);
+ BatchDrainedResult result = completeOrSplitBatches(accum, batchSize);
+ numSplit += result.numSplit;
+ numBatches += result.numBatches;
+ assertTrue(String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. "
+ + "Random seed is " + seed,
+ numBatches, numSplit), (double) numSplit / numBatches < 0.1f);
+ }
+ }
+
+ private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords)
+ throws InterruptedException {
+ Random random = new Random();
+ random.setSeed(seed);
+
+ // First set the compression ratio estimation to be good.
+ CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
+ // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big.
+ for (int i = 0; i < numRecords; i++) {
+ accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0);
+ }
+
+ RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+ assertFalse(result.readyNodes.isEmpty());
+ Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertEquals(1, batches.size());
+ assertEquals(1, batches.values().iterator().next().size());
+ ProducerBatch batch = batches.values().iterator().next().get(0);
+ int numSplitBatches = accum.splitAndReenqueue(batch);
+ accum.deallocate(batch);
+
+ return numSplitBatches;
+ }
+
+ private BatchDrainedResult completeOrSplitBatches(RecordAccumulator accum, int batchSize) {
+ int numSplit = 0;
+ int numBatches = 0;
+ boolean batchDrained;
+ do {
+ batchDrained = false;
+ RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ for (List<ProducerBatch> batchList : batches.values()) {
+ for (ProducerBatch batch : batchList) {
+ batchDrained = true;
+ numBatches++;
+ if (batch.sizeInBytes() > batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD) {
+ accum.splitAndReenqueue(batch);
+ // release the resource of the original big batch.
+ numSplit++;
+ } else {
+ batch.done(0L, 0L, null);
+ }
+ accum.deallocate(batch);
+ }
+ }
+ } while (batchDrained);
+ return new BatchDrainedResult(numSplit, numBatches);
+ }
+
+ /**
+ * Generates the compression ratio at about 0.6
+ */
+ private byte[] bytesWithGoodCompression(Random random) {
+ byte[] value = new byte[100];
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ while (buffer.remaining() > 0)
+ buffer.putInt(random.nextInt(1000));
+ return value;
+ }
+
+ /**
+ * Generates the compression ratio at about 0.9
+ */
+ private byte[] bytesWithPoorCompression(Random random, int size) {
+ byte[] value = new byte[size];
+ random.nextBytes(value);
+ return value;
+ }
+
+ private class BatchDrainedResult {
+ final int numSplit;
+ final int numBatches;
+ BatchDrainedResult(int numSplit, int numBatches) {
+ this.numBatches = numBatches;
+ this.numSplit = numSplit;
+ }
+ }
+
/**
* Return the offset delta.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 1321fba..cc30f4d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
@@ -531,6 +532,95 @@ public class SenderTest {
assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
}
+ @Test
+ public void testSplitBatchAndSend() throws Exception {
+ int maxRetries = 1;
+ String topic = "testSplitBatchAndSend";
+ // Set a good compression ratio.
+ CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
+ Metrics m = new Metrics();
+ TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0);
+ txnManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(123456L, (short) 0));
+ accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
+ new ApiVersions(), txnManager);
+ try {
+ Sender sender = new Sender(client,
+ metadata,
+ this.accumulator,
+ true,
+ MAX_REQUEST_SIZE,
+ ACKS_ALL,
+ maxRetries,
+ m,
+ time,
+ REQUEST_TIMEOUT,
+ 1000L,
+ txnManager,
+ new ApiVersions());
+ // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
+ Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
+ metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
+ // Send the first message.
+ TopicPartition tp2 = new TopicPartition(topic, 1);
+ Future<RecordMetadata> f1 =
+ accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+ Future<RecordMetadata> f2 =
+ accumulator.append(tp2, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds()); // connect
+ sender.run(time.milliseconds()); // send produce request
+ assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+ String id = client.requests().peek().destination();
+ assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
+ Node node = new Node(Integer.valueOf(id), "localhost", 0);
+ assertEquals(1, client.inFlightRequestCount());
+ assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
+ responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
+ client.respond(new ProduceResponse(responseMap));
+ sender.run(time.milliseconds()); // split and reenqueue
+ // The compression ratio should have been improved once.
+ assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
+ CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
+ sender.run(time.milliseconds()); // send produce request
+ assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+ assertFalse("The future shouldn't have been done.", f1.isDone());
+ assertFalse("The future shouldn't have been done.", f2.isDone());
+ id = client.requests().peek().destination();
+ assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
+ node = new Node(Integer.valueOf(id), "localhost", 0);
+ assertEquals(1, client.inFlightRequestCount());
+ assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+ responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
+ client.respond(new ProduceResponse(responseMap));
+ sender.run(time.milliseconds()); // receive
+ assertTrue("The future should have been done.", f1.isDone());
+ assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue());
+ assertFalse("The future shouldn't have been done.", f2.isDone());
+ assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
+ sender.run(time.milliseconds()); // send produce request
+ id = client.requests().peek().destination();
+ assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
+ node = new Node(Integer.valueOf(id), "localhost", 0);
+ assertEquals(1, client.inFlightRequestCount());
+ assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+ responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
+ client.respond(new ProduceResponse(responseMap));
+ sender.run(time.milliseconds()); // receive
+ assertTrue("The future should have been done.", f2.isDone());
+ assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue());
+ assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
+ assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp2).isEmpty());
+
+ assertTrue("There should be a split",
+ m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0);
+ } finally {
+ m.close();
+ }
+ }
+
private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
assertTrue("Request should be completed", future.isDone());
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c08a2f0..58d4371 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -223,11 +223,11 @@ public class MemoryRecordsBuilderTest {
MemoryRecords built = builder.build();
if (compressionType == CompressionType.NONE) {
- assertEquals(1.0, builder.compressionRate(), 0.00001);
+ assertEquals(1.0, builder.compressionRatio(), 0.00001);
} else {
int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
- assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+ assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001);
}
}
@@ -254,11 +254,11 @@ public class MemoryRecordsBuilderTest {
MemoryRecords built = builder.build();
if (compressionType == CompressionType.NONE) {
- assertEquals(1.0, builder.compressionRate(), 0.00001);
+ assertEquals(1.0, builder.compressionRatio(), 0.00001);
} else {
int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V1;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
- assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+ assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001);
}
}
@@ -359,6 +359,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ builder.setEstimatedCompressionRatio(0.5f);
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(1L, "b".getBytes(), "2".getBytes());