You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/15 18:01:36 UTC

[kafka] branch 1.1 updated: KAFKA-6512: Discard references to buffers used for compression (#4570)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new cb7315b  KAFKA-6512: Discard references to buffers used for compression (#4570)
cb7315b is described below

commit cb7315b91b132ecf9194dbe3fc77c2faad0a1758
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Feb 15 17:36:44 2018 +0000

    KAFKA-6512: Discard references to buffers used for compression (#4570)
    
    ProducerBatch retains references to MemoryRecordsBuilder and cannot be freed until acks are received. Removing references to buffers used for compression after records are built will enable these to be garbage collected sooner, reducing the risk of OOM.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Lothsahn <Lo...@gmail.com>
---
 .../common/record/KafkaLZ4BlockOutputStream.java   | 40 ++++++++++++++--------
 .../kafka/common/record/MemoryRecordsBuilder.java  | 21 ++++++++----
 .../common/record/MemoryRecordsBuilderTest.java    | 34 ++++++++++++++++++
 3 files changed, 73 insertions(+), 22 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 8cfc37b..591ab16 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -34,7 +33,7 @@ import net.jpountz.xxhash.XXHashFactory;
  *
  * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+public final class KafkaLZ4BlockOutputStream extends OutputStream {
 
     public static final int MAGIC = 0x184D2204;
     public static final int LZ4_MAX_HEADER_LENGTH = 19;
@@ -52,9 +51,10 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
     private final boolean useBrokenFlagDescriptorChecksum;
     private final FLG flg;
     private final BD bd;
-    private final byte[] buffer;
-    private final byte[] compressedBuffer;
     private final int maxBlockSize;
+    private OutputStream out;
+    private byte[] buffer;
+    private byte[] compressedBuffer;
     private int bufferOffset;
     private boolean finished;
 
@@ -71,7 +71,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
      * @throws IOException
      */
     public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
-        super(out);
+        this.out = out;
         compressor = LZ4Factory.fastestInstance().fastCompressor();
         checksum = XXHashFactory.fastestInstance().hash32();
         this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
@@ -204,7 +204,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
     private void writeEndMark() throws IOException {
         ByteUtils.writeUnsignedIntLE(out, 0);
         // TODO implement content checksum, update flg.validate()
-        finished = true;
     }
 
     @Override
@@ -259,15 +258,26 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
 
     @Override
     public void close() throws IOException {
-        if (!finished) {
-            // basically flush the buffer writing the last block
-            writeBlock();
-            // write the end block and finish the stream
-            writeEndMark();
-        }
-        if (out != null) {
-            out.close();
-            out = null;
+        try {
+            if (!finished) {
+                // basically flush the buffer writing the last block
+                writeBlock();
+                // write the end block
+                writeEndMark();
+            }
+        } finally {
+            try {
+                if (out != null) {
+                    try (OutputStream outStream = out) {
+                        outStream.flush();
+                    }
+                }
+            } finally {
+                out = null;
+                buffer = null;
+                compressedBuffer = null;
+                finished = true;
+            }
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a9b57ac..6f6404f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import static org.apache.kafka.common.utils.Utils.wrapNullable;
@@ -38,11 +39,15 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
  */
 public class MemoryRecordsBuilder {
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
+        @Override
+        public void write(int b) throws IOException {
+            throw new IllegalStateException("MemoryRecordsBuilder is closed for record appends");
+        }
+    });
 
     private final TimestampType timestampType;
     private final CompressionType compressionType;
-    // Used to append records, may compress data on the fly
-    private final DataOutputStream appendStream;
     // Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access
     // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough,
     // so it's not safe to hold a direct reference to the underlying ByteBuffer.
@@ -60,7 +65,8 @@ public class MemoryRecordsBuilder {
     // from previous batches before appending any records.
     private float estimatedCompressionRatio = 1.0F;
 
-    private boolean appendStreamIsClosed = false;
+    // Used to append records, may compress data on the fly
+    private DataOutputStream appendStream;
     private boolean isTransactional;
     private long producerId;
     private short producerEpoch;
@@ -265,12 +271,13 @@ public class MemoryRecordsBuilder {
      * possible to update the RecordBatch header.
      */
     public void closeForRecordAppends() {
-        if (!appendStreamIsClosed) {
+        if (appendStream != CLOSED_STREAM) {
             try {
                 appendStream.close();
-                appendStreamIsClosed = true;
             } catch (IOException e) {
                 throw new KafkaException(e);
+            } finally {
+                appendStream = CLOSED_STREAM;
             }
         }
     }
@@ -663,7 +670,7 @@ public class MemoryRecordsBuilder {
     }
 
     private void ensureOpenForRecordAppend() {
-        if (appendStreamIsClosed)
+        if (appendStream == CLOSED_STREAM)
             throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends");
     }
 
@@ -738,7 +745,7 @@ public class MemoryRecordsBuilder {
     public boolean isFull() {
         // note that the write limit is respected only after the first record is added which ensures we can always
         // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
-        return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
+        return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c713d17..a90fb29 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -644,6 +645,39 @@ public class MemoryRecordsBuilderTest {
         return values;
     }
 
+    @Test
+    public void testBuffersDereferencedOnClose() {
+        Runtime runtime = Runtime.getRuntime();
+        int payloadLen = 1024 * 1024;
+        ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
+        byte[] key = new byte[0];
+        byte[] value = new byte[payloadLen];
+        new Random().nextBytes(value); // Use random payload so that compressed buffer is large
+        List<MemoryRecordsBuilder> builders = new ArrayList<>(100);
+        long startMem = 0;
+        long memUsed = 0;
+        int iterations =  0;
+        while (iterations++ < 100) {
+            buffer.rewind();
+            MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
+                    TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID,
+                    RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH, 0);
+            builder.append(1L, new byte[0], value);
+            builder.build();
+            builders.add(builder);
+
+            System.gc();
+            memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem;
+            // Ignore memory usage during initialization
+            if (iterations == 2)
+                startMem = memUsed;
+            else if (iterations > 2 && memUsed < (iterations - 2) * 1024)
+                break;
+        }
+        assertTrue("Memory usage too high: " + memUsed, iterations < 100);
+    }
+
     private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int numRecords,
             int numRecordsConverted, long finalBytes, long preConvertedBytes) {
         assertNotNull("Records processing info is null", processingStats);

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.