You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/03 23:14:54 UTC

kafka git commit: MINOR: Close the producer batch append stream when the batch gets full to free up resources

Repository: kafka
Updated Branches:
  refs/heads/trunk 040fde8ec -> b9b2cfc28


MINOR: Close the producer batch append stream when the batch gets full to free up resources

Of particular importance are compression buffers (64 KB for LZ4, for example).

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2796 from apurvam/idempotent-producer-close-data-stream


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9b2cfc2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9b2cfc2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9b2cfc2

Branch: refs/heads/trunk
Commit: b9b2cfc28c055745f606ba1a20c5e8e428acefcf
Parents: 040fde8
Author: Apurva Mehta <ap...@confluent.io>
Authored: Tue Apr 4 00:13:47 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 4 00:13:56 2017 +0100

----------------------------------------------------------------------
 .../producer/internals/ProducerBatch.java       |  8 ++++
 .../producer/internals/RecordAccumulator.java   | 14 ++++--
 .../common/record/MemoryRecordsBuilder.java     | 47 +++++++++++++++++---
 core/src/main/scala/kafka/log/Log.scala         |  2 -
 core/src/main/scala/kafka/log/LogCleaner.scala  |  2 +-
 5 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 4f68c20..9621794 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -234,6 +234,14 @@ public final class ProducerBatch {
         recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
     }
 
+    /**
+     * Release resources required for record appends (e.g. compression buffers). Once this method is called, it's only
+     * possible to update the RecordBatch header.
+     */
+    public void closeForRecordAppends() {
+        recordsBuilder.closeForRecordAppends();
+    }
+
     public void close() {
         recordsBuilder.close();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 299356e..be03142 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -237,16 +237,22 @@ public final class RecordAccumulator {
     }
 
     /**
-     *  Try to append to a ProducerBatch. If it is full, we return null and a new batch is created. If the existing batch is
-     *  full, it will be closed right before send, or if it is expired, or when the producer is closed, whichever
-     *  comes first.
+     *  Try to append to a ProducerBatch.
+     *
+     *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
+     *  resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
+     *  and memory records built) in one of the following cases (whichever comes first): right before send,
+     *  if it is expired, or when the producer is closed.
      */
     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<ProducerBatch> deque) {
         ProducerBatch last = deque.peekLast();
         if (last != null) {
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
-            if (future != null)
+            if (future == null)
+                last.closeForRecordAppends();
+            else
                 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
+
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index d238093..7f66193 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -30,6 +30,10 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
  * This class is used to write new log data in memory, i.e. this is the write path for {@link MemoryRecords}.
  * It transparently handles compression and exposes methods for appending new records, possibly with message
  * format conversion.
+ *
+ * In cases where keeping memory retention low is important and there's a gap between the time that record appends stop
+ * and the builder is closed (e.g. the Producer), it's important to call `closeForRecordAppends` when the former happens.
+ * This will release resources like compression buffers that can be relatively large (64 KB for LZ4).
  */
 public class MemoryRecordsBuilder {
     private static final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
@@ -50,7 +54,11 @@ public class MemoryRecordsBuilder {
 
     private final TimestampType timestampType;
     private final CompressionType compressionType;
+    // Used to append records, may compress data on the fly
     private final DataOutputStream appendStream;
+    // Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access
+    // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough,
+    // so it's not safe to hold a direct reference to the underlying ByteBuffer.
     private final ByteBufferOutputStream bufferStream;
     private final byte magic;
     private final int initPos;
@@ -61,6 +69,7 @@ public class MemoryRecordsBuilder {
     private final int writeLimit;
     private final int initialCapacity;
 
+    private boolean appendStreamIsClosed = false;
     private long producerId;
     private short producerEpoch;
     private int baseSequence;
@@ -206,15 +215,26 @@ public class MemoryRecordsBuilder {
         this.baseSequence = baseSequence;
     }
 
+    /**
+     * Release resources required for record appends (e.g. compression buffers). Once this method is called, it's only
+     * possible to update the RecordBatch header.
+     */
+    public void closeForRecordAppends() {
+        if (!appendStreamIsClosed) {
+            try {
+                appendStream.close();
+                appendStreamIsClosed = true;
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+        }
+    }
+
     public void close() {
         if (builtRecords != null)
             return;
 
-        try {
-            appendStream.close();
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
+        closeForRecordAppends();
 
         if (numRecords == 0L) {
             buffer().position(initPos);
@@ -233,6 +253,7 @@ public class MemoryRecordsBuilder {
     }
 
     private void writeDefaultBatchHeader() {
+        ensureOpenForRecordBatchWrite();
         ByteBuffer buffer = bufferStream.buffer();
         int pos = buffer.position();
         buffer.position(initPos);
@@ -257,6 +278,7 @@ public class MemoryRecordsBuilder {
     }
 
     private void writeLegacyCompressedWrapperHeader() {
+        ensureOpenForRecordBatchWrite();
         ByteBuffer buffer = bufferStream.buffer();
         int pos = buffer.position();
         buffer.position(initPos);
@@ -416,6 +438,7 @@ public class MemoryRecordsBuilder {
      * @param record The record to add
      */
     public void appendUncheckedWithOffset(long offset, LegacyRecord record) {
+        ensureOpenForRecordAppend();
         try {
             int size = record.sizeInBytes();
             AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);
@@ -469,6 +492,7 @@ public class MemoryRecordsBuilder {
 
     private long appendDefaultRecord(long offset, boolean isControlRecord, long timestamp,
                                      ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException {
+        ensureOpenForRecordAppend();
         int offsetDelta = (int) (offset - baseOffset);
         long timestampDelta = timestamp - baseTimestamp;
         long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers);
@@ -478,6 +502,7 @@ public class MemoryRecordsBuilder {
     }
 
     private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
+        ensureOpenForRecordAppend();
         if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
             timestamp = logAppendTime;
 
@@ -515,6 +540,18 @@ public class MemoryRecordsBuilder {
         }
     }
 
+    private void ensureOpenForRecordAppend() {
+        if (appendStreamIsClosed)
+            throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends");
+        if (isClosed())
+            throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed");
+    }
+
+    private void ensureOpenForRecordBatchWrite() {
+        if (isClosed())
+            throw new IllegalStateException("Tried to write record batch header, but MemoryRecordsBuilder is closed");
+    }
+
     /**
      * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}.
      * @return The estimated number of bytes written

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 95a6896..a052a9e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -578,8 +578,6 @@ class Log(@volatile var dir: File,
     val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
 
     for (batch <- records.batches.asScala) {
-      if (isFromClient && batch.magic >= RecordBatch.MAGIC_VALUE_V2 && shallowMessageCount > 0)
-        throw new InvalidRecordException("Client produce requests should not have more than one batch")
       // update the first offset if on the first message. For magic versions older than 2, we use the last offset
       // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
       // For magic version 2, we can get the first offset directly from the batch header.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 830f906..d0e8ec4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -499,7 +499,7 @@ private[log] class Cleaner(val id: Int,
     if (record.isControlRecord)
       return true
 
-    // retain the entry if it is the last one produced by an active idempotent producer to ensure that
+    // retain the record if it is the last one produced by an active idempotent producer to ensure that
     // the PID is not removed from the log before it has been expired
     if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset))
       return true