You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:17 UTC
[03/50] [abbrv] kafka git commit: KAFKA-3594;
After calling MemoryRecords.close() method, hasRoomFor() method
should return false
KAFKA-3594; After calling MemoryRecords.close() method, hasRoomFor() method should return false
This exception is occurring when producer is trying to append a record to a Re-enqueued record batch in the accumulator. We should not allow to add a record to Re-enqueued record batch. This is due a bug in MemoryRecords.java/hasRoomFor() method. After calling MemoryRecords.close() method, hasRoomFor() method should return false.
Author: Manikumar reddy O <ma...@gmail.com>
Reviewers: Ismael Juma, Grant Henke, Guozhang Wang
Closes #1249 from omkreddy/KAFKA-3594
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32740044
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32740044
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32740044
Branch: refs/heads/0.10.0
Commit: 32740044972ad3bfc9539b5d76128dceddedc2ba
Parents: 74e6dc8
Author: Manikumar reddy O <ma...@gmail.com>
Authored: Thu Apr 21 15:13:25 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 21 15:13:25 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/common/record/MemoryRecords.java | 5 ++++-
.../apache/kafka/common/record/MemoryRecordsTest.java | 11 +++++++++++
2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32740044/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index f37ef39..7175953 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -115,7 +115,10 @@ public class MemoryRecords implements Records {
* to accept this single record.
*/
public boolean hasRoomFor(byte[] key, byte[] value) {
- return this.writable && this.compressor.numRecordsWritten() == 0 ?
+ if (!this.writable)
+ return false;
+
+ return this.compressor.numRecordsWritten() == 0 ?
this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/32740044/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index ed64f63..b1117f1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -71,6 +71,17 @@ public class MemoryRecordsTest {
}
}
+ @Test
+ public void testHasRoomForMethod() {
+ MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
+ recs1.append(0, new Record(0L, "a".getBytes(), "1".getBytes()));
+
+ assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+ recs1.close();
+ assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+
+ }
+
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<Object[]>();