You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/24 08:13:45 UTC
kafka git commit: KAFKA-3747;
Close `RecordBatch.records` when append to batch fails
Repository: kafka
Updated Branches:
refs/heads/trunk dee388066 -> fe27d8f78
KAFKA-3747; Close `RecordBatch.records` when append to batch fails
With this change, `test_producer_throughput` with message_size=10000, compression_type=snappy and a snappy buffer size of 32k can be executed in a heap of 192m in a local environment (768m is needed without this change).
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1418 from ijuma/kafka-3747-close-record-batch-when-append-fails
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe27d8f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe27d8f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe27d8f7
Branch: refs/heads/trunk
Commit: fe27d8f787f38428e0add36edeac9d694f16af53
Parents: dee3880
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue May 24 09:13:40 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 24 09:13:40 2016 +0100
----------------------------------------------------------------------
.../producer/internals/RecordAccumulator.java | 63 ++++++++++++--------
.../kafka/common/record/MemoryRecords.java | 5 ++
.../internals/RecordAccumulatorTest.java | 18 +++++-
3 files changed, 60 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe27d8f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5339096..a73d882 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -74,7 +74,6 @@ public final class RecordAccumulator {
private final Set<TopicPartition> muted;
private int drainIndex;
-
/**
* Create a new record accumulator
*
@@ -104,11 +103,11 @@ public final class RecordAccumulator {
this.compression = compression;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
- this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
+ this.batches = new CopyOnWriteMap<>();
String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
this.incomplete = new IncompleteRecordBatches();
- this.muted = new HashSet<TopicPartition>();
+ this.muted = new HashSet<>();
this.time = time;
registerMetrics(metrics, metricGrpName);
}
@@ -171,12 +170,9 @@ public final class RecordAccumulator {
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
- RecordBatch last = dq.peekLast();
- if (last != null) {
- FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
- if (future != null)
- return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
- }
+ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+ if (appendResult != null)
+ return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
@@ -187,14 +183,12 @@ public final class RecordAccumulator {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
- RecordBatch last = dq.peekLast();
- if (last != null) {
- FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
- if (future != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
- free.deallocate(buffer);
- return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
- }
+
+ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+ if (appendResult != null) {
+ // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+ free.deallocate(buffer);
+ return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
@@ -210,11 +204,27 @@ public final class RecordAccumulator {
}
/**
+ * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
+ * resources (like compression streams buffers).
+ */
+ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
+ RecordBatch last = deque.peekLast();
+ if (last != null) {
+ FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
+ if (future == null)
+ last.records.close();
+ else
+ return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
+ }
+ return null;
+ }
+
+ /**
* Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
* due to metadata being unavailable
*/
public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
- List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
+ List<RecordBatch> expiredBatches = new ArrayList<>();
int count = 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
@@ -245,7 +255,7 @@ public final class RecordAccumulator {
}
}
}
- if (expiredBatches.size() > 0)
+ if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", count);
return expiredBatches;
@@ -287,7 +297,7 @@ public final class RecordAccumulator {
* </ol>
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
- Set<Node> readyNodes = new HashSet<Node>();
+ Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;
@@ -333,7 +343,7 @@ public final class RecordAccumulator {
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> deque = entry.getValue();
synchronized (deque) {
- if (deque.size() > 0)
+ if (!deque.isEmpty())
return true;
}
}
@@ -357,11 +367,11 @@ public final class RecordAccumulator {
if (nodes.isEmpty())
return Collections.emptyMap();
- Map<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>();
+ Map<Integer, List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
- List<RecordBatch> ready = new ArrayList<RecordBatch>();
+ List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
@@ -436,6 +446,11 @@ public final class RecordAccumulator {
boolean flushInProgress() {
return flushesInProgress.get() > 0;
}
+
+ /* Visible for testing */
+ Map<TopicPartition, Deque<RecordBatch>> batches() {
+ return Collections.unmodifiableMap(batches);
+ }
/**
* Initiate the flushing of data from the accumulator...this makes all requests immediately ready
@@ -569,7 +584,7 @@ public final class RecordAccumulator {
public Iterable<RecordBatch> all() {
synchronized (incomplete) {
- return new ArrayList<RecordBatch>(this.incomplete);
+ return new ArrayList<>(this.incomplete);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe27d8f7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index fcf7f44..603f74b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -213,6 +213,11 @@ public class MemoryRecords implements Records {
return builder.toString();
}
+ /** Visible for testing */
+ public boolean isWritable() {
+ return writable;
+ }
+
public static class RecordsIterator extends AbstractIterator<LogEntry> {
private final ByteBuffer buffer;
private final DataInputStream stream;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe27d8f7/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index b3a5a04..43ac15a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -75,14 +76,27 @@ public class RecordAccumulatorTest {
@Test
public void testFull() throws Exception {
long now = time.milliseconds();
- RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
- int appends = 1024 / msgSize;
+ int batchSize = 1024;
+ RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * batchSize, CompressionType.NONE, 10L, 100L, metrics, time);
+ int appends = batchSize / msgSize;
for (int i = 0; i < appends; i++) {
+ // append to the first batch
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
+ assertEquals(1, partitionBatches.size());
+ assertTrue(partitionBatches.peekFirst().records.isWritable());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
+
+ // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
+ assertEquals(2, partitionBatches.size());
+ Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator();
+ assertFalse(partitionBatchesIterator.next().records.isWritable());
+ assertTrue(partitionBatchesIterator.next().records.isWritable());
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);