You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/12 10:15:52 UTC
kafka git commit: KAFKA-3147;
Memory records is not writable in MirrorMaker
Repository: kafka
Updated Branches:
refs/heads/trunk f141e647a -> 85599bc3e
KAFKA-3147; Memory records is not writable in MirrorMaker
Remove the batch from the RecordAccumulator once its closed while aborting batches. Make sure we don't accept new batch appends to RecordAccumulator while the producer is being closed.
Author: Mayuresh Gharat <mg...@mgharat-ld1.linkedin.biz>
Reviewers: Jiangjie Qin, Ismael Juma, Guozhang Wang
Closes #825 from MayureshGharat/KAFKA-3147
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85599bc3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85599bc3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85599bc3
Branch: refs/heads/trunk
Commit: 85599bc3e861c77cd91c55273debe396da85deeb
Parents: f141e64
Author: Mayuresh Gharat <mg...@linkedin.com>
Authored: Fri Feb 12 17:15:41 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Feb 12 17:15:41 2016 +0800
----------------------------------------------------------------------
.../kafka/clients/producer/internals/RecordAccumulator.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/85599bc3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d36234c..3c710c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -156,11 +156,11 @@ public final class RecordAccumulator {
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
try {
- if (closed)
- throw new IllegalStateException("Cannot send after the producer is closed.");
// check if we have an in-progress batch
Deque<RecordBatch> dq = dequeFor(tp);
synchronized (dq) {
+ if (closed)
+ throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
@@ -452,6 +452,7 @@ public final class RecordAccumulator {
// Close the batch before aborting
synchronized (dq) {
batch.records.close();
+ dq.remove(batch);
}
batch.done(-1L, new IllegalStateException("Producer is closed forcefully."));
deallocate(batch);