You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:18 UTC
[02/50] [abbrv] kafka git commit: MINOR: Fix typo and tweak wording
in `RecordAccumulator` comments
MINOR: Fix typo and tweak wording in `RecordAccumulator` comments
This was recently introduced in:
https://github.com/apache/kafka/commit/1fbe445dde71df0023a978c5e54dd229d3d23e1b
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #1152 from ijuma/fix-typos-in-record-accumulator
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c0660bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c0660bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c0660bf
Branch: refs/heads/0.10.0
Commit: 4c0660bf3da9879cb405a0f85cf1524511e091e8
Parents: 1fbe445
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Mar 28 09:00:03 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Mar 28 09:00:03 2016 -0700
----------------------------------------------------------------------
.../clients/producer/internals/RecordAccumulator.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c0660bf/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 915c4d3..7f5b16f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -218,14 +218,14 @@ public final class RecordAccumulator {
int count = 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
- // We only check if the batch should be expired if the partition does not have a batch in flight.
- // This is to avoid the later batches get expired when an earlier batch is still in progress.
- // This protection only takes effect when user sets max.in.flight.request.per.connection=1.
- // Otherwise the expiration order is not guranteed.
TopicPartition tp = entry.getKey();
+ // We only check if the batch should be expired if the partition does not have a batch in flight.
+ // This is to prevent later batches from being expired while an earlier batch is still in progress.
+ // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection
+ // is only active in this case. Otherwise the expiration order is not guaranteed.
if (!muted.contains(tp)) {
synchronized (dq) {
- // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut
+ // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
RecordBatch lastBatch = dq.peekLast();
Iterator<RecordBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {