You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/12 03:41:53 UTC

kafka git commit: KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone

Repository: kafka
Updated Branches:
  refs/heads/trunk 124f73b17 -> 1cd22ed33


KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone

Removed the check for expiring only those batches whose metadata is unavailable. Now the batches will be expired irrespective of whether the leader is available or not, as soon as it reaches the requestimeout threshold.

Author: Mayuresh Gharat <mg...@mgharat-ld1.linkedin.biz>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #503 from MayureshGharat/kafka-2805


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd22ed3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd22ed3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd22ed3

Branch: refs/heads/trunk
Commit: 1cd22ed33f1e1f63d8cb63f68309e5d8f43be1e1
Parents: 124f73b
Author: Mayuresh Gharat <mg...@mgharat-ld1.linkedin.biz>
Authored: Wed Nov 11 18:41:45 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 11 18:41:45 2015 -0800

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 21 +++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd22ed3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index db61121..d4a8a23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -216,18 +216,15 @@ public final class RecordAccumulator {
                 Iterator<RecordBatch> batchIterator = dq.iterator();
                 while (batchIterator.hasNext()) {
                     RecordBatch batch = batchIterator.next();
-                    Node leader = cluster.leaderFor(topicAndPartition);
-                    if (leader == null) {
-                        // check if the batch is expired
-                        if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
-                            expiredBatches.add(batch);
-                            count++;
-                            batchIterator.remove();
-                            deallocate(batch);
-                        } else {
-                            if (!batch.inRetry()) {
-                                break;
-                            }
+                    // check if the batch is expired
+                    if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
+                        expiredBatches.add(batch);
+                        count++;
+                        batchIterator.remove();
+                        deallocate(batch);
+                    } else {
+                        if (!batch.inRetry()) {
+                            break;
                         }
                     }
                 }