You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/22 00:13:29 UTC

kafka git commit: KAFKA-3594; After calling MemoryRecords.close() method, hasRoomFor() method should return false

Repository: kafka
Updated Branches:
  refs/heads/trunk 74e6dc842 -> 327400449


KAFKA-3594; After calling MemoryRecords.close() method, hasRoomFor() method should return false

This exception is occurring when producer is trying to append a record to a Re-enqueued record batch in the accumulator. We should not allow to add a record to Re-enqueued record batch. This is due a bug in MemoryRecords.java/hasRoomFor() method. After calling MemoryRecords.close() method, hasRoomFor() method should return false.

Author: Manikumar reddy O <ma...@gmail.com>

Reviewers: Ismael Juma, Grant Henke, Guozhang Wang

Closes #1249 from omkreddy/KAFKA-3594


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32740044
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32740044
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32740044

Branch: refs/heads/trunk
Commit: 32740044972ad3bfc9539b5d76128dceddedc2ba
Parents: 74e6dc8
Author: Manikumar reddy O <ma...@gmail.com>
Authored: Thu Apr 21 15:13:25 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 21 15:13:25 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/MemoryRecords.java    |  5 ++++-
 .../apache/kafka/common/record/MemoryRecordsTest.java    | 11 +++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/32740044/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index f37ef39..7175953 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -115,7 +115,10 @@ public class MemoryRecords implements Records {
      * to accept this single record.
      */
     public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.writable && this.compressor.numRecordsWritten() == 0 ?
+        if (!this.writable)
+            return false;
+
+        return this.compressor.numRecordsWritten() == 0 ?
             this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
             this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/32740044/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index ed64f63..b1117f1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -71,6 +71,17 @@ public class MemoryRecordsTest {
         }
     }
 
+    @Test
+    public void testHasRoomForMethod() {
+        MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
+        recs1.append(0, new Record(0L, "a".getBytes(), "1".getBytes()));
+
+        assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+        recs1.close();
+        assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+
+    }
+
     @Parameterized.Parameters
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<Object[]>();