You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/24 08:13:45 UTC

kafka git commit: KAFKA-3747; Close `RecordBatch.records` when append to batch fails

Repository: kafka
Updated Branches:
  refs/heads/trunk dee388066 -> fe27d8f78


KAFKA-3747; Close `RecordBatch.records` when append to batch fails

With this change, `test_producer_throughput` with message_size=10000, compression_type=snappy and a snappy buffer size of 32k can be executed in a heap of 192m in a local environment (768m is needed without this change).

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1418 from ijuma/kafka-3747-close-record-batch-when-append-fails


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe27d8f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe27d8f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe27d8f7

Branch: refs/heads/trunk
Commit: fe27d8f787f38428e0add36edeac9d694f16af53
Parents: dee3880
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue May 24 09:13:40 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 24 09:13:40 2016 +0100

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 63 ++++++++++++--------
 .../kafka/common/record/MemoryRecords.java      |  5 ++
 .../internals/RecordAccumulatorTest.java        | 18 +++++-
 3 files changed, 60 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe27d8f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5339096..a73d882 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -74,7 +74,6 @@ public final class RecordAccumulator {
     private final Set<TopicPartition> muted;
     private int drainIndex;
 
-
     /**
      * Create a new record accumulator
      * 
@@ -104,11 +103,11 @@ public final class RecordAccumulator {
         this.compression = compression;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
-        this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
+        this.batches = new CopyOnWriteMap<>();
         String metricGrpName = "producer-metrics";
         this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
         this.incomplete = new IncompleteRecordBatches();
-        this.muted = new HashSet<TopicPartition>();
+        this.muted = new HashSet<>();
         this.time = time;
         registerMetrics(metrics, metricGrpName);
     }
@@ -171,12 +170,9 @@ public final class RecordAccumulator {
             synchronized (dq) {
                 if (closed)
                     throw new IllegalStateException("Cannot send after the producer is closed.");
-                RecordBatch last = dq.peekLast();
-                if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
-                    if (future != null)
-                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
-                }
+                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+                if (appendResult != null)
+                    return appendResult;
             }
 
             // we don't have an in-progress record batch try to allocate a new batch
@@ -187,14 +183,12 @@ public final class RecordAccumulator {
                 // Need to check if producer is closed again after grabbing the dequeue lock.
                 if (closed)
                     throw new IllegalStateException("Cannot send after the producer is closed.");
-                RecordBatch last = dq.peekLast();
-                if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
-                    if (future != null) {
-                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
-                        free.deallocate(buffer);
-                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
-                    }
+
+                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+                if (appendResult != null) {
+                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+                    free.deallocate(buffer);
+                    return appendResult;
                 }
                 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
@@ -210,11 +204,27 @@ public final class RecordAccumulator {
     }
 
     /**
+     * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
+     * resources (like compression streams buffers).
+     */
+    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
+        RecordBatch last = deque.peekLast();
+        if (last != null) {
+            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
+            if (future == null)
+                last.records.close();
+            else
+                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
+        }
+        return null;
+    }
+
+    /**
      * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
      * due to metadata being unavailable
      */
     public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
-        List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
+        List<RecordBatch> expiredBatches = new ArrayList<>();
         int count = 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
             Deque<RecordBatch> dq = entry.getValue();
@@ -245,7 +255,7 @@ public final class RecordAccumulator {
                 }
             }
         }
-        if (expiredBatches.size() > 0)
+        if (!expiredBatches.isEmpty())
             log.trace("Expired {} batches in accumulator", count);
 
         return expiredBatches;
@@ -287,7 +297,7 @@ public final class RecordAccumulator {
      * </ol>
      */
     public ReadyCheckResult ready(Cluster cluster, long nowMs) {
-        Set<Node> readyNodes = new HashSet<Node>();
+        Set<Node> readyNodes = new HashSet<>();
         long nextReadyCheckDelayMs = Long.MAX_VALUE;
         boolean unknownLeadersExist = false;
 
@@ -333,7 +343,7 @@ public final class RecordAccumulator {
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
             Deque<RecordBatch> deque = entry.getValue();
             synchronized (deque) {
-                if (deque.size() > 0)
+                if (!deque.isEmpty())
                     return true;
             }
         }
@@ -357,11 +367,11 @@ public final class RecordAccumulator {
         if (nodes.isEmpty())
             return Collections.emptyMap();
 
-        Map<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>();
+        Map<Integer, List<RecordBatch>> batches = new HashMap<>();
         for (Node node : nodes) {
             int size = 0;
             List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
-            List<RecordBatch> ready = new ArrayList<RecordBatch>();
+            List<RecordBatch> ready = new ArrayList<>();
             /* to make starvation less likely this loop doesn't start at 0 */
             int start = drainIndex = drainIndex % parts.size();
             do {
@@ -436,6 +446,11 @@ public final class RecordAccumulator {
     boolean flushInProgress() {
         return flushesInProgress.get() > 0;
     }
+
+    /* Visible for testing */
+    Map<TopicPartition, Deque<RecordBatch>> batches() {
+        return Collections.unmodifiableMap(batches);
+    }
     
     /**
      * Initiate the flushing of data from the accumulator...this makes all requests immediately ready
@@ -569,7 +584,7 @@ public final class RecordAccumulator {
         
         public Iterable<RecordBatch> all() {
             synchronized (incomplete) {
-                return new ArrayList<RecordBatch>(this.incomplete);
+                return new ArrayList<>(this.incomplete);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe27d8f7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index fcf7f44..603f74b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -213,6 +213,11 @@ public class MemoryRecords implements Records {
         return builder.toString();
     }
 
+    /** Visible for testing */
+    public boolean isWritable() {
+        return writable;
+    }
+
     public static class RecordsIterator extends AbstractIterator<LogEntry> {
         private final ByteBuffer buffer;
         private final DataInputStream stream;

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe27d8f7/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index b3a5a04..43ac15a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -75,14 +76,27 @@ public class RecordAccumulatorTest {
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
-        int appends = 1024 / msgSize;
+        int batchSize = 1024;
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * batchSize, CompressionType.NONE, 10L, 100L, metrics, time);
+        int appends = batchSize / msgSize;
         for (int i = 0; i < appends; i++) {
+            // append to the first batch
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+            Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
+            assertEquals(1, partitionBatches.size());
+            assertTrue(partitionBatches.peekFirst().records.isWritable());
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
+
+        // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+        Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
+        assertEquals(2, partitionBatches.size());
+        Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator();
+        assertFalse(partitionBatchesIterator.next().records.isWritable());
+        assertTrue(partitionBatchesIterator.next().records.isWritable());
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);