You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/30 21:33:43 UTC
kafka git commit: KAFKA-4469;
Fix consumer performance regression from inefficient list removal and
copy
Repository: kafka
Updated Branches:
refs/heads/trunk 8d188c911 -> f9d7808ba
KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
Closes #2190 from hachikuji/KAFKA-4469
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9d7808b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9d7808b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9d7808b
Branch: refs/heads/trunk
Commit: f9d7808bab52a0a6fa879aaac0b1da80e0f33adb
Parents: 8d188c9
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 30 13:18:04 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 30 13:18:04 2016 -0800
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 82 ++++++++++----------
1 file changed, 41 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f9d7808b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 703ea29..e414fcb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,7 +61,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@@ -416,25 +415,40 @@ public class Fetcher<K, V> {
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
- if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+ if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
nextInLineRecords = parseFetchedData(completedFetch);
} else {
- recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
+ TopicPartition partition = nextInLineRecords.partition;
+
+ List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
+ if (!records.isEmpty()) {
+ List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
+ if (currentRecords == null) {
+ drained.put(partition, records);
+ } else {
+ // this case shouldn't usually happen because we only send one fetch at a time per partition,
+ // but it might conceivably happen in some rare cases (such as partition leader changes).
+ // we have to copy to a new list because the old one may be immutable
+ List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
+ newRecords.addAll(currentRecords);
+ newRecords.addAll(records);
+ drained.put(partition, newRecords);
+ }
+ recordsRemaining -= records.size();
+ }
}
}
return drained;
}
- private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
- PartitionRecords<K, V> partitionRecords,
- int maxRecords) {
- if (partitionRecords.isEmpty())
- return 0;
+ private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
+ if (partitionRecords.isDrained())
+ return Collections.emptyList();
if (!subscriptions.isAssigned(partitionRecords.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
@@ -447,22 +461,14 @@ public class Fetcher<K, V> {
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
} else if (partitionRecords.fetchOffset == position) {
// we are ensured to have at least one record since we already checked for emptiness
- List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
+ List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
- List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
- if (records == null) {
- records = partRecords;
- drained.put(partitionRecords.partition, records);
- } else {
- records.addAll(partRecords);
- }
-
subscriptions.position(partitionRecords.partition, nextOffset);
- return partRecords.size();
+ return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
@@ -471,8 +477,8 @@ public class Fetcher<K, V> {
}
}
- partitionRecords.discard();
- return 0;
+ partitionRecords.drain();
+ return Collections.emptyList();
}
/**
@@ -600,7 +606,7 @@ public class Fetcher<K, V> {
private List<TopicPartition> fetchablePartitions() {
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
- if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
+ if (nextInLineRecords != null && !nextInLineRecords.isDrained())
fetchable.remove(nextInLineRecords.partition);
for (CompletedFetch completedFetch : completedFetches)
fetchable.remove(completedFetch.partition);
@@ -768,6 +774,7 @@ public class Fetcher<K, V> {
private long fetchOffset;
private TopicPartition partition;
private List<ConsumerRecord<K, V>> records;
+ private int position = 0;
public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
this.fetchOffset = fetchOffset;
@@ -775,33 +782,26 @@ public class Fetcher<K, V> {
this.records = records;
}
- private boolean isEmpty() {
- return records == null || records.isEmpty();
+ private boolean isDrained() {
+ return records == null || position >= records.size();
}
- private void discard() {
+ private void drain() {
this.records = null;
}
- private List<ConsumerRecord<K, V>> take(int n) {
- if (records == null)
- return new ArrayList<>();
-
- if (n >= records.size()) {
- List<ConsumerRecord<K, V>> res = this.records;
- this.records = null;
- return res;
- }
+ private List<ConsumerRecord<K, V>> drainRecords(int n) {
+ if (isDrained())
+ return Collections.emptyList();
- List<ConsumerRecord<K, V>> res = new ArrayList<>(n);
- Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
- for (int i = 0; i < n; i++) {
- res.add(iterator.next());
- iterator.remove();
- }
+ // using a sublist avoids a potentially expensive list copy (depending on the size of the records
+ // and the maximum we can return from poll). The cost is that we cannot mutate the returned sublist.
+ int limit = Math.min(records.size(), position + n);
+ List<ConsumerRecord<K, V>> res = Collections.unmodifiableList(records.subList(position, limit));
- if (iterator.hasNext())
- this.fetchOffset = iterator.next().offset();
+ position = limit;
+ if (position < records.size())
+ fetchOffset = records.get(position).offset();
return res;
}