You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/28 18:05:57 UTC

kafka git commit: MINOR: Clean-up MemoryRecords variables and APIs

Repository: kafka
Updated Branches:
  refs/heads/trunk 80223baa3 -> b6fe164dd


MINOR: Clean-up MemoryRecords variables and APIs

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jun Rao

Closes #348 from guozhangwang/MemoryRecordsCapacity


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6fe164d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6fe164d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6fe164d

Branch: refs/heads/trunk
Commit: b6fe164dd6f9483469c0b0661c24467e33e91cd9
Parents: 80223ba
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Oct 28 10:11:05 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 28 10:11:05 2015 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |  2 +-
 .../clients/producer/internals/Sender.java      |  2 +-
 .../apache/kafka/common/record/Compressor.java  |  4 +
 .../kafka/common/record/MemoryRecords.java      | 98 +++++++++++---------
 .../org/apache/kafka/common/record/Records.java | 13 +--
 .../clients/consumer/internals/FetcherTest.java |  3 +-
 .../internals/RecordAccumulatorTest.java        |  5 +-
 .../kafka/common/record/MemoryRecordsTest.java  |  2 -
 8 files changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index eed2a5e..db61121 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -393,7 +393,7 @@ public final class RecordAccumulator {
      */
     public void deallocate(RecordBatch batch) {
         incomplete.remove(batch);
-        free.deallocate(batch.records.buffer(), batch.records.capacity());
+        free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 134d45a..56be021 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -324,7 +324,7 @@ public class Sender implements Runnable {
         final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
         for (RecordBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
-            produceRecordsByPartition.put(tp, (ByteBuffer) batch.records.buffer().flip());
+            produceRecordsByPartition.put(tp, batch.records.buffer());
             recordsByPartition.put(tp, batch);
         }
         ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index e570b29..27f757a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -186,6 +186,10 @@ public class Compressor {
         writtenUncompressed += size;
     }
 
+    public long numRecordsWritten() {
+        return numRecords;
+    }
+
     public long estimatedBytesWritten() {
         if (type == CompressionType.NONE) {
             return bufferStream.buffer().position();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 5f1b45c..a5a56e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,7 +16,6 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
 import java.util.Iterator;
 
 import org.apache.kafka.common.KafkaException;
@@ -27,17 +26,28 @@ import org.apache.kafka.common.utils.AbstractIterator;
  */
 public class MemoryRecords implements Records {
 
+    private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
+
+    // the compressor used for appends-only
     private final Compressor compressor;
-    private final int capacity;
-    private final int sizeLimit;
+
+    // the write limit for writable buffer, which may be smaller than the buffer capacity
+    private final int writeLimit;
+
+    // the capacity of the initial buffer, which is only used for de-allocation of writable records
+    private final int initialCapacity;
+
+    // the underlying buffer used for read; while the records are still writable it is null
     private ByteBuffer buffer;
+
+    // indicate if the memory records is writable or not (i.e. used for appends or read-only)
     private boolean writable;
 
     // Construct a writable memory records
-    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) {
+    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
         this.writable = writable;
-        this.capacity = buffer.capacity();
-        this.sizeLimit = sizeLimit;
+        this.writeLimit = writeLimit;
+        this.initialCapacity = buffer.capacity();
         if (this.writable) {
             this.buffer = null;
             this.compressor = new Compressor(buffer, type);
@@ -47,16 +57,17 @@ public class MemoryRecords implements Records {
         }
     }
 
-    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) {
-        return new MemoryRecords(buffer, type, true, capacity);
+    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
+        return new MemoryRecords(buffer, type, true, writeLimit);
     }
 
     public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
+        // use the buffer capacity as the default write limit
         return emptyRecords(buffer, type, buffer.capacity());
     }
 
     public static MemoryRecords readableRecords(ByteBuffer buffer) {
-        return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
+        return new MemoryRecords(buffer, CompressionType.NONE, false, WRITE_LIMIT_FOR_READABLE_ONLY);
     }
 
     /**
@@ -90,25 +101,24 @@ public class MemoryRecords implements Records {
 
     /**
      * Check if we have room for a new record containing the given key/value pair
-     * 
+     *
      * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
      * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
      * re-allocation in the underlying byte buffer stream.
-     * 
-     * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
-     * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
-     * capacity will be the message size, but the size limit will still be the batch size), and when the records' size
-     * has exceed this limit we also mark this record as full.
+     *
+     * There is an exceptional case when appending a single message whose size is larger than the batch size, the
+     * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
+     * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
+     * to accept this single record.
      */
     public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD +
-                                                 Record.recordSize(key, value) &&
-               this.sizeLimit >= this.compressor.estimatedBytesWritten();
+        return this.writable && this.compressor.numRecordsWritten() == 0 ?
+            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
+            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
     }
 
     public boolean isFull() {
-        return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() ||
-               this.sizeLimit <= this.compressor.estimatedBytesWritten();
+        return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();
     }
 
     /**
@@ -116,22 +126,27 @@ public class MemoryRecords implements Records {
      */
     public void close() {
         if (writable) {
+            // close the compressor to fill-in wrapper message metadata if necessary
             compressor.close();
-            writable = false;
+
+            // flip the underlying buffer to be ready for reads
             buffer = compressor.buffer();
-        }
-    }
+            buffer.flip();
 
-    /** Write the records in this set to the given channel */
-    public int writeTo(GatheringByteChannel channel) throws IOException {
-        return channel.write(buffer);
+            // reset the writable flag
+            writable = false;
+        }
     }
 
     /**
      * The size of this record set
      */
     public int sizeInBytes() {
-        return compressor.buffer().position();
+        if (writable) {
+            return compressor.buffer().position();
+        } else {
+            return compressor.buffer().limit();
+        }
     }
 
     /**
@@ -145,33 +160,32 @@ public class MemoryRecords implements Records {
     }
 
     /**
-     * Return the capacity of the buffer
+     * Return the capacity of the initial buffer, for writable records
+     * it may be different from the current buffer's capacity
      */
-    public int capacity() {
-        return this.capacity;
+    public int initialCapacity() {
+        return this.initialCapacity;
     }
 
     /**
-     * Get the byte buffer that backs this records instance
+     * Get the byte buffer that backs this records instance for reading
      */
     public ByteBuffer buffer() {
-        return buffer.duplicate();
-    }
-
-    /**
-     * Return a flipped duplicate of the closed buffer to reading records
-     */
-    public ByteBuffer flip() {
         if (writable)
-            throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
+            throw new IllegalStateException("The memory records must not be writable any more before getting its underlying buffer");
 
-        return (ByteBuffer) buffer.flip();
+        return buffer.duplicate();
     }
 
     @Override
     public Iterator<LogEntry> iterator() {
-        ByteBuffer copy = this.buffer.duplicate();
-        return new RecordsIterator(copy, CompressionType.NONE, false);
+        if (writable) {
+            // flip on a duplicate buffer for reading
+            return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), CompressionType.NONE, false);
+        } else {
+            // do not need to flip for non-writable buffer
+            return new RecordsIterator(this.buffer.duplicate(), CompressionType.NONE, false);
+        }
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index aa15561..d43cdab 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
 /**
  * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
  * for the in-memory representation.
@@ -30,16 +27,8 @@ public interface Records extends Iterable<LogEntry> {
     int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
 
     /**
-     * Write these records to the given channel
-     * @param channel The channel to write to
-     * @return The number of bytes written
-     * @throws IOException If the write fails.
-     */
-    public int writeTo(GatheringByteChannel channel) throws IOException;
-
-    /**
      * The size of these records in bytes
      */
-    public int sizeInBytes();
+    int sizeInBytes();
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 957d8f9..1e7a215 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -98,7 +98,6 @@ public class FetcherTest {
         records.append(2L, "key".getBytes(), "value-2".getBytes());
         records.append(3L, "key".getBytes(), "value-3".getBytes());
         records.close();
-        records.flip();
     }
 
     @After
@@ -142,7 +141,7 @@ public class FetcherTest {
 
         // resize the limit of the buffer to pretend it is only fetch-size large
         fetcher.initFetches(cluster);
-        client.prepareResponse(fetchResponse((ByteBuffer) records.flip().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse((ByteBuffer) records.buffer().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         fetcher.fetchedRecords();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 887499d..132b83b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -87,7 +87,7 @@ public class RecordAccumulatorTest {
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
-        batch.records.flip();
+
         Iterator<LogEntry> iter = batch.records.iterator();
         for (int i = 0; i < appends; i++) {
             LogEntry entry = iter.next();
@@ -116,7 +116,7 @@ public class RecordAccumulatorTest {
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
-        batch.records.flip();
+
         Iterator<LogEntry> iter = batch.records.iterator();
         LogEntry entry = iter.next();
         assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -169,7 +169,6 @@ public class RecordAccumulatorTest {
             List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {
-                    batch.records.flip();
                     for (LogEntry entry : batch.records)
                         read++;
                     accum.deallocate(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6fe164d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 8ec610a..e343327 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -50,9 +50,7 @@ public class MemoryRecordsTest {
             recs2.append(i, toArray(r.key()), toArray(r.value()));
         }
         recs1.close();
-        recs1.flip();
         recs2.close();
-        recs2.flip();
 
         for (int iteration = 0; iteration < 2; iteration++) {
             for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {