You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/12 10:15:52 UTC

kafka git commit: KAFKA-3147; Memory records is not writable in MirrorMaker

Repository: kafka
Updated Branches:
  refs/heads/trunk f141e647a -> 85599bc3e


KAFKA-3147; Memory records is not writable in MirrorMaker

Remove the batch from the RecordAccumulator once its closed while aborting batches. Make sure we don't accept new batch appends to RecordAccumulator while the producer is being closed.

Author: Mayuresh Gharat <mg...@mgharat-ld1.linkedin.biz>

Reviewers: Jiangjie Qin, Ismael Juma, Guozhang Wang

Closes #825 from MayureshGharat/KAFKA-3147


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85599bc3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85599bc3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85599bc3

Branch: refs/heads/trunk
Commit: 85599bc3e861c77cd91c55273debe396da85deeb
Parents: f141e64
Author: Mayuresh Gharat <mg...@linkedin.com>
Authored: Fri Feb 12 17:15:41 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Feb 12 17:15:41 2016 +0800

----------------------------------------------------------------------
 .../kafka/clients/producer/internals/RecordAccumulator.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/85599bc3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d36234c..3c710c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -156,11 +156,11 @@ public final class RecordAccumulator {
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
         try {
-            if (closed)
-                throw new IllegalStateException("Cannot send after the producer is closed.");
             // check if we have an in-progress batch
             Deque<RecordBatch> dq = dequeFor(tp);
             synchronized (dq) {
+                if (closed)
+                    throw new IllegalStateException("Cannot send after the producer is closed.");
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
                     FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
@@ -452,6 +452,7 @@ public final class RecordAccumulator {
             // Close the batch before aborting
             synchronized (dq) {
                 batch.records.close();
+                dq.remove(batch);
             }
             batch.done(-1L, new IllegalStateException("Producer is closed forcefully."));
             deallocate(batch);