You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/30 19:53:48 UTC
kafka git commit: KAFKA-5316;
Follow-up with ByteBufferOutputStream and other misc improvements
Repository: kafka
Updated Branches:
refs/heads/trunk 6b0349791 -> b3788d8dc
KAFKA-5316; Follow-up with ByteBufferOutputStream and other misc improvements
ByteBufferOutputStream improvements:
* Document pitfalls
* Improve efficiency when dealing with direct byte buffers
* Improve handling of buffer expansion
* Be consistent about using `limit` instead of `capacity`
* Add constructors that allocate the internal buffer
Other minor changes:
* Fix log warning to specify correct Kafka version
* Clean-ups
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3166 from ijuma/minor-kafka-5316-follow-ups
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3788d8d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3788d8d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3788d8d
Branch: refs/heads/trunk
Commit: b3788d8dcbeee7a20f562e878c187a75bac11ff0
Parents: 6b03497
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue May 30 12:53:32 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 30 12:53:32 2017 -0700
----------------------------------------------------------------------
.../producer/internals/RecordAccumulator.java | 2 +-
.../kafka/common/record/DefaultRecord.java | 19 -----
.../kafka/common/record/MemoryRecords.java | 8 +-
.../common/record/MemoryRecordsBuilder.java | 7 +-
.../common/utils/ByteBufferOutputStream.java | 85 +++++++++++++-------
.../kafka/common/record/DefaultRecordTest.java | 18 +++--
.../utils/ByteBufferOutputStreamTest.java | 1 +
7 files changed, 77 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e1f04a8..d3d1b82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -531,7 +531,7 @@ public final class RecordAccumulator {
*/
public void deallocate(ProducerBatch batch) {
incomplete.remove(batch);
- // Only deallocate the batch if it is not a split batch because split batch are allocated aside the
+ // Only deallocate the batch if it is not a split batch because split batch are allocated outside the
// buffer pool.
if (!batch.isSplitBatch())
free.deallocate(batch.buffer(), batch.initialCapacity());
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 05b5bb2..e61bbc9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Checksums;
import org.apache.kafka.common.utils.Crc32C;
@@ -230,24 +229,6 @@ public class DefaultRecord implements Record {
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
- /**
- * Write the record to `out` and return its size.
- */
- public static int writeTo(ByteBuffer out,
- int offsetDelta,
- long timestampDelta,
- ByteBuffer key,
- ByteBuffer value,
- Header[] headers) {
- try {
- return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta,
- key, value, headers);
- } catch (IOException e) {
- // cannot actually be raised by ByteBufferOutputStream
- throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e);
- }
- }
-
@Override
public boolean hasMagic(byte magic) {
return magic >= MAGIC_VALUE_V2;
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 56d7ed1..528095e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -191,7 +191,7 @@ public class MemoryRecords extends AbstractRecords {
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
- "(new size is {}). Consumers from version 0.10.1 and earlier may need to " +
+ "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " +
"increase their fetch sizes.",
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
@@ -527,13 +527,13 @@ public class MemoryRecords extends AbstractRecords {
if (records.length == 0)
return MemoryRecords.EMPTY;
int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
- ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+ ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(sizeEstimate);
long logAppendTime = RecordBatch.NO_TIMESTAMP;
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType,
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferStream, magic, compressionType, timestampType,
initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false,
- partitionLeaderEpoch, buffer.capacity());
+ partitionLeaderEpoch, sizeEstimate);
for (SimpleRecord record : records)
builder.append(record);
return builder.build();
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 49d70c6..aaca851 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -56,7 +56,6 @@ public class MemoryRecordsBuilder {
private final boolean isControlBatch;
private final int partitionLeaderEpoch;
private final int writeLimit;
- private final int initialCapacity;
private volatile float estimatedCompressionRatio;
@@ -115,7 +114,6 @@ public class MemoryRecordsBuilder {
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
- this.initialCapacity = bufferStream.capacity();
if (magic > RecordBatch.MAGIC_VALUE_V1) {
bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET);
@@ -125,7 +123,6 @@ public class MemoryRecordsBuilder {
bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
}
- // create the stream
this.bufferStream = bufferStream;
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic,
COMPRESSION_DEFAULT_BUFFER_SIZE));
@@ -174,7 +171,7 @@ public class MemoryRecordsBuilder {
}
public int initialCapacity() {
- return initialCapacity;
+ return bufferStream.initialCapacity();
}
public double compressionRatio() {
@@ -718,7 +715,7 @@ public class MemoryRecordsBuilder {
// Be conservative and not take compression of the new record into consideration.
return numRecords == 0 ?
- this.initialCapacity >= recordSize :
+ bufferStream.remaining() >= recordSize :
this.writeLimit >= estimatedBytesWritten() + recordSize;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
index 79d4d4c..2b13e7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
@@ -20,40 +20,60 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
- * A ByteBuffer-backed OutputStream
+ * A ByteBuffer-backed OutputStream that expands the internal ByteBuffer as required. Given this, the caller should
+ * always access the underlying ByteBuffer via the {@link #buffer()} method until all writes are completed.
+ *
+ * This class is typically used for 2 purposes:
+ *
+ * 1. Write to a ByteBuffer when there is a chance that we may need to expand it in order to fit all the desired data
+ * 2. Write to a ByteBuffer via methods that expect an OutputStream interface
+ *
+ * Hard to track bugs can happen when this class is used for the second reason and unexpected buffer expansion happens.
+ * So, it's best to assume that buffer expansion can always happen. An improvement would be to create a separate class
+ * that throws an error if buffer expansion is required to avoid the issue altogether.
*/
public class ByteBufferOutputStream extends OutputStream {
private static final float REALLOCATION_FACTOR = 1.1f;
+ private final int initialCapacity;
+ private final int initialPosition;
private ByteBuffer buffer;
- private int initialPosition;
+ /**
+ * Creates an instance of this class that will write to the received `buffer` up to its `limit`. If necessary to
+ * satisfy `write` or `position` calls, larger buffers will be allocated so the {@link #buffer()} method may return
+ * a different buffer than the received `buffer` parameter.
+ *
+ * Prefer one of the constructors that allocate the internal buffer for clearer semantics.
+ */
public ByteBufferOutputStream(ByteBuffer buffer) {
this.buffer = buffer;
this.initialPosition = buffer.position();
+ this.initialCapacity = buffer.capacity();
+ }
+
+ public ByteBufferOutputStream(int initialCapacity) {
+ this(initialCapacity, false);
+ }
+
+ public ByteBufferOutputStream(int initialCapacity, boolean directBuffer) {
+ this(directBuffer ? ByteBuffer.allocateDirect(initialCapacity) : ByteBuffer.allocate(initialCapacity));
}
public void write(int b) {
- if (buffer.remaining() < 1)
- expandBuffer(buffer.capacity() + 1);
+ maybeExpandBuffer(1);
buffer.put((byte) b);
}
public void write(byte[] bytes, int off, int len) {
- if (buffer.remaining() < len)
- expandBuffer(buffer.position() + len);
+ maybeExpandBuffer(len);
buffer.put(bytes, off, len);
}
- public void write(ByteBuffer buffer) {
- if (buffer.hasArray())
- write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
- else {
- int pos = buffer.position();
- for (int i = pos, limit = buffer.remaining() + pos; i < limit; i++)
- write(buffer.get(i));
- }
+ public void write(ByteBuffer sourceBuffer) {
+ maybeExpandBuffer(sourceBuffer.remaining());
+ buffer.put(sourceBuffer);
}
public ByteBuffer buffer() {
@@ -64,8 +84,8 @@ public class ByteBufferOutputStream extends OutputStream {
return buffer.position();
}
- public int capacity() {
- return buffer.capacity();
+ public int remaining() {
+ return buffer.remaining();
}
public int limit() {
@@ -73,23 +93,32 @@ public class ByteBufferOutputStream extends OutputStream {
}
public void position(int position) {
- if (position > buffer.limit())
- expandBuffer(position);
+ maybeExpandBuffer(position - buffer.position());
buffer.position(position);
}
- private void expandBuffer(int size) {
- int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
- ByteBuffer temp = ByteBuffer.allocate(expandSize);
- if (buffer.hasArray()) {
- temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
- } else {
- int limit = buffer.position();
- for (int i = 0; i < limit; i++)
- temp.put(buffer.get(i));
- }
+ /**
+ * The capacity of the first internal ByteBuffer used by this class. This is useful in cases where a pooled
+ * ByteBuffer was passed via the constructor and it needs to be returned to the pool.
+ */
+ public int initialCapacity() {
+ return initialCapacity;
+ }
+ private void maybeExpandBuffer(int remainingRequired) {
+ if (remainingRequired > buffer.remaining())
+ expandBuffer(remainingRequired);
+ }
+
+ private void expandBuffer(int remainingRequired) {
+ int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired);
+ ByteBuffer temp = ByteBuffer.allocate(expandSize);
+ int limit = limit();
+ buffer.flip();
+ temp.put(buffer);
+ buffer.limit(limit);
// reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed
+ // we should ideally only do this for the original buffer, but the additional complexity doesn't seem worth it
buffer.position(initialPosition);
buffer = temp;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 61b7b00..2c0ef05 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -18,8 +18,11 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertArrayEquals;
@@ -29,7 +32,7 @@ import static org.junit.Assert.assertNotNull;
public class DefaultRecordTest {
@Test
- public void testBasicSerde() {
+ public void testBasicSerde() throws IOException {
Header[] headers = new Header[] {
new RecordHeader("foo", "value".getBytes()),
new RecordHeader("bar", (byte[]) null),
@@ -51,8 +54,10 @@ public class DefaultRecordTest {
long baseTimestamp = System.currentTimeMillis();
long timestampDelta = 323;
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value(), record.headers());
+ ByteBufferOutputStream out = new ByteBufferOutputStream(1024);
+ DefaultRecord.writeTo(new DataOutputStream(out), offsetDelta, timestampDelta, record.key(), record.value(),
+ record.headers());
+ ByteBuffer buffer = out.buffer();
buffer.flip();
DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
@@ -69,7 +74,7 @@ public class DefaultRecordTest {
}
@Test
- public void testSerdeNoSequence() {
+ public void testSerdeNoSequence() throws IOException {
ByteBuffer key = ByteBuffer.wrap("hi".getBytes());
ByteBuffer value = ByteBuffer.wrap("there".getBytes());
long baseOffset = 37;
@@ -77,8 +82,9 @@ public class DefaultRecordTest {
long baseTimestamp = System.currentTimeMillis();
long timestampDelta = 323;
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value, new Header[0]);
+ ByteBufferOutputStream out = new ByteBufferOutputStream(1024);
+ DefaultRecord.writeTo(new DataOutputStream(out), offsetDelta, timestampDelta, key, value, new Header[0]);
+ ByteBuffer buffer = out.buffer();
buffer.flip();
DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3788d8d/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
index 2ef5672..fbac719 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
@@ -94,6 +94,7 @@ public class ByteBufferOutputStreamTest {
ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32));
output.write(input);
+ assertEquals(8, input.position());
assertEquals(8, output.position());
assertEquals(value, output.buffer().getLong(0));
}