You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/28 18:05:57 UTC
kafka git commit: MINOR: Clean-up MemoryRecords variables and APIs
Repository: kafka
Updated Branches:
refs/heads/trunk 80223baa3 -> b6fe164dd
MINOR: Clean-up MemoryRecords variables and APIs
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jun Rao
Closes #348 from guozhangwang/MemoryRecordsCapacity
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6fe164d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6fe164d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6fe164d
Branch: refs/heads/trunk
Commit: b6fe164dd6f9483469c0b0661c24467e33e91cd9
Parents: 80223ba
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Oct 28 10:11:05 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 28 10:11:05 2015 -0700
----------------------------------------------------------------------
.../producer/internals/RecordAccumulator.java | 2 +-
.../clients/producer/internals/Sender.java | 2 +-
.../apache/kafka/common/record/Compressor.java | 4 +
.../kafka/common/record/MemoryRecords.java | 98 +++++++++++---------
.../org/apache/kafka/common/record/Records.java | 13 +--
.../clients/consumer/internals/FetcherTest.java | 3 +-
.../internals/RecordAccumulatorTest.java | 5 +-
.../kafka/common/record/MemoryRecordsTest.java | 2 -
8 files changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index eed2a5e..db61121 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -393,7 +393,7 @@ public final class RecordAccumulator {
*/
public void deallocate(RecordBatch batch) {
incomplete.remove(batch);
- free.deallocate(batch.records.buffer(), batch.records.capacity());
+ free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 134d45a..56be021 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -324,7 +324,7 @@ public class Sender implements Runnable {
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
- produceRecordsByPartition.put(tp, (ByteBuffer) batch.records.buffer().flip());
+ produceRecordsByPartition.put(tp, batch.records.buffer());
recordsByPartition.put(tp, batch);
}
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index e570b29..27f757a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -186,6 +186,10 @@ public class Compressor {
writtenUncompressed += size;
}
+ public long numRecordsWritten() {
+ return numRecords;
+ }
+
public long estimatedBytesWritten() {
if (type == CompressionType.NONE) {
return bufferStream.buffer().position();
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 5f1b45c..a5a56e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,7 +16,6 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
import java.util.Iterator;
import org.apache.kafka.common.KafkaException;
@@ -27,17 +26,28 @@ import org.apache.kafka.common.utils.AbstractIterator;
*/
public class MemoryRecords implements Records {
+ private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
+
+ // the compressor used for appends-only
private final Compressor compressor;
- private final int capacity;
- private final int sizeLimit;
+
+ // the write limit for writable buffer, which may be smaller than the buffer capacity
+ private final int writeLimit;
+
+ // the capacity of the initial buffer, which is only used for de-allocation of writable records
+ private final int initialCapacity;
+
+ // the underlying buffer used for read; while the records are still writable it is null
private ByteBuffer buffer;
+
+ // indicate if the memory records is writable or not (i.e. used for appends or read-only)
private boolean writable;
// Construct a writable memory records
- private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) {
+ private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
this.writable = writable;
- this.capacity = buffer.capacity();
- this.sizeLimit = sizeLimit;
+ this.writeLimit = writeLimit;
+ this.initialCapacity = buffer.capacity();
if (this.writable) {
this.buffer = null;
this.compressor = new Compressor(buffer, type);
@@ -47,16 +57,17 @@ public class MemoryRecords implements Records {
}
}
- public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) {
- return new MemoryRecords(buffer, type, true, capacity);
+ public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
+ return new MemoryRecords(buffer, type, true, writeLimit);
}
public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
+ // use the buffer capacity as the default write limit
return emptyRecords(buffer, type, buffer.capacity());
}
public static MemoryRecords readableRecords(ByteBuffer buffer) {
- return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
+ return new MemoryRecords(buffer, CompressionType.NONE, false, WRITE_LIMIT_FOR_READABLE_ONLY);
}
/**
@@ -90,25 +101,24 @@ public class MemoryRecords implements Records {
/**
* Check if we have room for a new record containing the given key/value pair
- *
+ *
* Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
* accurate if compression is really used. When this happens, the following append may cause dynamic buffer
* re-allocation in the underlying byte buffer stream.
- *
- * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
- * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
- * capacity will be the message size, but the size limit will still be the batch size), and when the records' size
- * has exceed this limit we also mark this record as full.
+ *
+ * There is an exceptional case when appending a single message whose size is larger than the batch size, the
+ * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
+ * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
+ * to accept this single record.
*/
public boolean hasRoomFor(byte[] key, byte[] value) {
- return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD +
- Record.recordSize(key, value) &&
- this.sizeLimit >= this.compressor.estimatedBytesWritten();
+ return this.writable && this.compressor.numRecordsWritten() == 0 ?
+ this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
+ this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}
public boolean isFull() {
- return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() ||
- this.sizeLimit <= this.compressor.estimatedBytesWritten();
+ return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();
}
/**
@@ -116,22 +126,27 @@ public class MemoryRecords implements Records {
*/
public void close() {
if (writable) {
+ // close the compressor to fill-in wrapper message metadata if necessary
compressor.close();
- writable = false;
+
+ // flip the underlying buffer to be ready for reads
buffer = compressor.buffer();
- }
- }
+ buffer.flip();
- /** Write the records in this set to the given channel */
- public int writeTo(GatheringByteChannel channel) throws IOException {
- return channel.write(buffer);
+ // reset the writable flag
+ writable = false;
+ }
}
/**
* The size of this record set
*/
public int sizeInBytes() {
- return compressor.buffer().position();
+ if (writable) {
+ return compressor.buffer().position();
+ } else {
+ return compressor.buffer().limit();
+ }
}
/**
@@ -145,33 +160,32 @@ public class MemoryRecords implements Records {
}
/**
- * Return the capacity of the buffer
+ * Return the capacity of the initial buffer, for writable records
+ * it may be different from the current buffer's capacity
*/
- public int capacity() {
- return this.capacity;
+ public int initialCapacity() {
+ return this.initialCapacity;
}
/**
- * Get the byte buffer that backs this records instance
+ * Get the byte buffer that backs this records instance for reading
*/
public ByteBuffer buffer() {
- return buffer.duplicate();
- }
-
- /**
- * Return a flipped duplicate of the closed buffer to reading records
- */
- public ByteBuffer flip() {
if (writable)
- throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
+ throw new IllegalStateException("The memory records must not be writable any more before getting its underlying buffer");
- return (ByteBuffer) buffer.flip();
+ return buffer.duplicate();
}
@Override
public Iterator<LogEntry> iterator() {
- ByteBuffer copy = this.buffer.duplicate();
- return new RecordsIterator(copy, CompressionType.NONE, false);
+ if (writable) {
+ // flip on a duplicate buffer for reading
+ return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), CompressionType.NONE, false);
+ } else {
+ // do not need to flip for non-writable buffer
+ return new RecordsIterator(this.buffer.duplicate(), CompressionType.NONE, false);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index aa15561..d43cdab 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.common.record;
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
/**
* A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
* for the in-memory representation.
@@ -30,16 +27,8 @@ public interface Records extends Iterable<LogEntry> {
int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
/**
- * Write these records to the given channel
- * @param channel The channel to write to
- * @return The number of bytes written
- * @throws IOException If the write fails.
- */
- public int writeTo(GatheringByteChannel channel) throws IOException;
-
- /**
* The size of these records in bytes
*/
- public int sizeInBytes();
+ int sizeInBytes();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 957d8f9..1e7a215 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -98,7 +98,6 @@ public class FetcherTest {
records.append(2L, "key".getBytes(), "value-2".getBytes());
records.append(3L, "key".getBytes(), "value-3".getBytes());
records.close();
- records.flip();
}
@After
@@ -142,7 +141,7 @@ public class FetcherTest {
// resize the limit of the buffer to pretend it is only fetch-size large
fetcher.initFetches(cluster);
- client.prepareResponse(fetchResponse((ByteBuffer) records.flip().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse((ByteBuffer) records.buffer().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
fetcher.fetchedRecords();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 887499d..132b83b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -87,7 +87,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- batch.records.flip();
+
Iterator<LogEntry> iter = batch.records.iterator();
for (int i = 0; i < appends; i++) {
LogEntry entry = iter.next();
@@ -116,7 +116,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- batch.records.flip();
+
Iterator<LogEntry> iter = batch.records.iterator();
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -169,7 +169,6 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
- batch.records.flip();
for (LogEntry entry : batch.records)
read++;
accum.deallocate(batch);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 8ec610a..e343327 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -50,9 +50,7 @@ public class MemoryRecordsTest {
recs2.append(i, toArray(r.key()), toArray(r.value()));
}
recs1.close();
- recs1.flip();
recs2.close();
- recs2.flip();
for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {