You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/30 19:53:48 UTC

kafka git commit: KAFKA-5316; Follow-up with ByteBufferOutputStream and other misc improvements

Repository: kafka
Updated Branches:
  refs/heads/trunk 6b0349791 -> b3788d8dc


KAFKA-5316; Follow-up with ByteBufferOutputStream and other misc improvements

ByteBufferOutputStream improvements:
* Document pitfalls
* Improve efficiency when dealing with direct byte buffers
* Improve handling of buffer expansion
* Be consistent about using `limit` instead of `capacity`
* Add constructors that allocate the internal buffer

Other minor changes:
* Fix log warning to specify correct Kafka version
* Clean-ups

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3166 from ijuma/minor-kafka-5316-follow-ups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3788d8d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3788d8d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3788d8d

Branch: refs/heads/trunk
Commit: b3788d8dcbeee7a20f562e878c187a75bac11ff0
Parents: 6b03497
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue May 30 12:53:32 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 30 12:53:32 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |  2 +-
 .../kafka/common/record/DefaultRecord.java      | 19 -----
 .../kafka/common/record/MemoryRecords.java      |  8 +-
 .../common/record/MemoryRecordsBuilder.java     |  7 +-
 .../common/utils/ByteBufferOutputStream.java    | 85 +++++++++++++-------
 .../kafka/common/record/DefaultRecordTest.java  | 18 +++--
 .../utils/ByteBufferOutputStreamTest.java       |  1 +
 7 files changed, 77 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e1f04a8..d3d1b82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -531,7 +531,7 @@ public final class RecordAccumulator {
      */
     public void deallocate(ProducerBatch batch) {
         incomplete.remove(batch);
-        // Only deallocate the batch if it is not a split batch because split batch are allocated aside the
+        // Only deallocate the batch if it is not a split batch because split batch are allocated outside the
         // buffer pool.
         if (!batch.isSplitBatch())
             free.deallocate(batch.buffer(), batch.initialCapacity());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 05b5bb2..e61bbc9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Checksums;
 import org.apache.kafka.common.utils.Crc32C;
@@ -230,24 +229,6 @@ public class DefaultRecord implements Record {
         return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
 
-    /**
-     * Write the record to `out` and return its size.
-     */
-    public static int writeTo(ByteBuffer out,
-                              int offsetDelta,
-                              long timestampDelta,
-                              ByteBuffer key,
-                              ByteBuffer value,
-                              Header[] headers) {
-        try {
-            return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta,
-                    key, value, headers);
-        } catch (IOException e) {
-            // cannot actually be raised by ByteBufferOutputStream
-            throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e);
-        }
-    }
-
     @Override
     public boolean hasMagic(byte magic) {
         return magic >= MAGIC_VALUE_V2;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 56d7ed1..528095e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -191,7 +191,7 @@ public class MemoryRecords extends AbstractRecords {
 
                 if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
                     log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
-                                    "(new size is {}). Consumers from version 0.10.1 and earlier may need to " +
+                                    "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " +
                                     "increase their fetch sizes.",
                             partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
 
@@ -527,13 +527,13 @@ public class MemoryRecords extends AbstractRecords {
         if (records.length == 0)
             return MemoryRecords.EMPTY;
         int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
-        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+        ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(sizeEstimate);
         long logAppendTime = RecordBatch.NO_TIMESTAMP;
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferStream, magic, compressionType, timestampType,
                 initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false,
-                partitionLeaderEpoch, buffer.capacity());
+                partitionLeaderEpoch, sizeEstimate);
         for (SimpleRecord record : records)
             builder.append(record);
         return builder.build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 49d70c6..aaca851 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -56,7 +56,6 @@ public class MemoryRecordsBuilder {
     private final boolean isControlBatch;
     private final int partitionLeaderEpoch;
     private final int writeLimit;
-    private final int initialCapacity;
 
     private volatile float estimatedCompressionRatio;
 
@@ -115,7 +114,6 @@ public class MemoryRecordsBuilder {
         this.writeLimit = writeLimit;
 
         this.initialPosition = bufferStream.position();
-        this.initialCapacity = bufferStream.capacity();
 
         if (magic > RecordBatch.MAGIC_VALUE_V1) {
             bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET);
@@ -125,7 +123,6 @@ public class MemoryRecordsBuilder {
             bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
         }
 
-        // create the stream
         this.bufferStream = bufferStream;
         this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic,
                 COMPRESSION_DEFAULT_BUFFER_SIZE));
@@ -174,7 +171,7 @@ public class MemoryRecordsBuilder {
     }
 
     public int initialCapacity() {
-        return initialCapacity;
+        return bufferStream.initialCapacity();
     }
 
     public double compressionRatio() {
@@ -718,7 +715,7 @@ public class MemoryRecordsBuilder {
 
         // Be conservative and not take compression of the new record into consideration.
         return numRecords == 0 ?
-                this.initialCapacity >= recordSize :
+                bufferStream.remaining() >= recordSize :
                 this.writeLimit >= estimatedBytesWritten() + recordSize;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
index 79d4d4c..2b13e7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
@@ -20,40 +20,60 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
- * A ByteBuffer-backed OutputStream
+ * A ByteBuffer-backed OutputStream that expands the internal ByteBuffer as required. Given this, the caller should
+ * always access the underlying ByteBuffer via the {@link #buffer()} method until all writes are completed.
+ *
+ * This class is typically used for 2 purposes:
+ *
+ * 1. Write to a ByteBuffer when there is a chance that we may need to expand it in order to fit all the desired data
+ * 2. Write to a ByteBuffer via methods that expect an OutputStream interface
+ *
+ * Hard to track bugs can happen when this class is used for the second reason and unexpected buffer expansion happens.
+ * So, it's best to assume that buffer expansion can always happen. An improvement would be to create a separate class
+ * that throws an error if buffer expansion is required to avoid the issue altogether.
  */
 public class ByteBufferOutputStream extends OutputStream {
 
     private static final float REALLOCATION_FACTOR = 1.1f;
 
+    private final int initialCapacity;
+    private final int initialPosition;
     private ByteBuffer buffer;
-    private int initialPosition;
 
+    /**
+     * Creates an instance of this class that will write to the received `buffer` up to its `limit`. If necessary to
+     * satisfy `write` or `position` calls, larger buffers will be allocated so the {@link #buffer()} method may return
+     * a different buffer than the received `buffer` parameter.
+     *
+     * Prefer one of the constructors that allocate the internal buffer for clearer semantics.
+     */
     public ByteBufferOutputStream(ByteBuffer buffer) {
         this.buffer = buffer;
         this.initialPosition = buffer.position();
+        this.initialCapacity = buffer.capacity();
+    }
+
+    public ByteBufferOutputStream(int initialCapacity) {
+        this(initialCapacity, false);
+    }
+
+    public ByteBufferOutputStream(int initialCapacity, boolean directBuffer) {
+        this(directBuffer ? ByteBuffer.allocateDirect(initialCapacity) : ByteBuffer.allocate(initialCapacity));
     }
 
     public void write(int b) {
-        if (buffer.remaining() < 1)
-            expandBuffer(buffer.capacity() + 1);
+        maybeExpandBuffer(1);
         buffer.put((byte) b);
     }
 
     public void write(byte[] bytes, int off, int len) {
-        if (buffer.remaining() < len)
-            expandBuffer(buffer.position() + len);
+        maybeExpandBuffer(len);
         buffer.put(bytes, off, len);
     }
 
-    public void write(ByteBuffer buffer) {
-        if (buffer.hasArray())
-            write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
-        else {
-            int pos = buffer.position();
-            for (int i = pos, limit = buffer.remaining() + pos; i < limit; i++)
-                write(buffer.get(i));
-        }
+    public void write(ByteBuffer sourceBuffer) {
+        maybeExpandBuffer(sourceBuffer.remaining());
+        buffer.put(sourceBuffer);
     }
 
     public ByteBuffer buffer() {
@@ -64,8 +84,8 @@ public class ByteBufferOutputStream extends OutputStream {
         return buffer.position();
     }
 
-    public int capacity() {
-        return buffer.capacity();
+    public int remaining() {
+        return buffer.remaining();
     }
 
     public int limit() {
@@ -73,23 +93,32 @@ public class ByteBufferOutputStream extends OutputStream {
     }
 
     public void position(int position) {
-        if (position > buffer.limit())
-            expandBuffer(position);
+        maybeExpandBuffer(position - buffer.position());
         buffer.position(position);
     }
 
-    private void expandBuffer(int size) {
-        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
-        ByteBuffer temp = ByteBuffer.allocate(expandSize);
-        if (buffer.hasArray()) {
-            temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
-        } else {
-            int limit = buffer.position();
-            for (int i = 0; i < limit; i++)
-                temp.put(buffer.get(i));
-        }
+    /**
+     * The capacity of the first internal ByteBuffer used by this class. This is useful in cases where a pooled
+     * ByteBuffer was passed via the constructor and it needs to be returned to the pool.
+     */
+    public int initialCapacity() {
+        return initialCapacity;
+    }
 
+    private void maybeExpandBuffer(int remainingRequired) {
+        if (remainingRequired > buffer.remaining())
+            expandBuffer(remainingRequired);
+    }
+
+    private void expandBuffer(int remainingRequired) {
+        int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired);
+        ByteBuffer temp = ByteBuffer.allocate(expandSize);
+        int limit = limit();
+        buffer.flip();
+        temp.put(buffer);
+        buffer.limit(limit);
         // reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed
+        // we should ideally only do this for the original buffer, but the additional complexity doesn't seem worth it
         buffer.position(initialPosition);
         buffer = temp;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 61b7b00..2c0ef05 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -18,8 +18,11 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -29,7 +32,7 @@ import static org.junit.Assert.assertNotNull;
 public class DefaultRecordTest {
 
     @Test
-    public void testBasicSerde() {
+    public void testBasicSerde() throws IOException {
         Header[] headers = new Header[] {
             new RecordHeader("foo", "value".getBytes()),
             new RecordHeader("bar", (byte[]) null),
@@ -51,8 +54,10 @@ public class DefaultRecordTest {
             long baseTimestamp = System.currentTimeMillis();
             long timestampDelta = 323;
 
-            ByteBuffer buffer = ByteBuffer.allocate(1024);
-            DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value(), record.headers());
+            ByteBufferOutputStream out = new ByteBufferOutputStream(1024);
+            DefaultRecord.writeTo(new DataOutputStream(out), offsetDelta, timestampDelta, record.key(), record.value(),
+                    record.headers());
+            ByteBuffer buffer = out.buffer();
             buffer.flip();
 
             DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
@@ -69,7 +74,7 @@ public class DefaultRecordTest {
     }
 
     @Test
-    public void testSerdeNoSequence() {
+    public void testSerdeNoSequence() throws IOException {
         ByteBuffer key = ByteBuffer.wrap("hi".getBytes());
         ByteBuffer value = ByteBuffer.wrap("there".getBytes());
         long baseOffset = 37;
@@ -77,8 +82,9 @@ public class DefaultRecordTest {
         long baseTimestamp = System.currentTimeMillis();
         long timestampDelta = 323;
 
-        ByteBuffer buffer = ByteBuffer.allocate(1024);
-        DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value, new Header[0]);
+        ByteBufferOutputStream out = new ByteBufferOutputStream(1024);
+        DefaultRecord.writeTo(new DataOutputStream(out), offsetDelta, timestampDelta, key, value, new Header[0]);
+        ByteBuffer buffer = out.buffer();
         buffer.flip();
 
         DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
index 2ef5672..fbac719 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
@@ -94,6 +94,7 @@ public class ByteBufferOutputStreamTest {
 
         ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32));
         output.write(input);
+        assertEquals(8, input.position());
         assertEquals(8, output.position());
         assertEquals(value, output.buffer().getLong(0));
     }