You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/30 21:33:43 UTC

kafka git commit: KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy

Repository: kafka
Updated Branches:
  refs/heads/trunk 8d188c911 -> f9d7808ba


KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>

Closes #2190 from hachikuji/KAFKA-4469


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9d7808b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9d7808b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9d7808b

Branch: refs/heads/trunk
Commit: f9d7808bab52a0a6fa879aaac0b1da80e0f33adb
Parents: 8d188c9
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 30 13:18:04 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 30 13:18:04 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 82 ++++++++++----------
 1 file changed, 41 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f9d7808b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 703ea29..e414fcb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,7 +61,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -416,25 +415,40 @@ public class Fetcher<K, V> {
         int recordsRemaining = maxPollRecords;
 
         while (recordsRemaining > 0) {
-            if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
                 CompletedFetch completedFetch = completedFetches.poll();
                 if (completedFetch == null)
                     break;
 
                 nextInLineRecords = parseFetchedData(completedFetch);
             } else {
-                recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
+                TopicPartition partition = nextInLineRecords.partition;
+
+                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
+                if (!records.isEmpty()) {
+                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
+                    if (currentRecords == null) {
+                        drained.put(partition, records);
+                    } else {
+                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
+                        // but it might conceivably happen in some rare cases (such as partition leader changes).
+                        // we have to copy to a new list because the old one may be immutable
+                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
+                        newRecords.addAll(currentRecords);
+                        newRecords.addAll(records);
+                        drained.put(partition, newRecords);
+                    }
+                    recordsRemaining -= records.size();
+                }
             }
         }
 
         return drained;
     }
 
-    private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
-                       PartitionRecords<K, V> partitionRecords,
-                       int maxRecords) {
-        if (partitionRecords.isEmpty())
-            return 0;
+    private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
+        if (partitionRecords.isDrained())
+            return Collections.emptyList();
 
         if (!subscriptions.isAssigned(partitionRecords.partition)) {
             // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
@@ -447,22 +461,14 @@ public class Fetcher<K, V> {
                 log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
             } else if (partitionRecords.fetchOffset == position) {
                 // we are ensured to have at least one record since we already checked for emptiness
-                List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
+                List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
                 long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
 
                 log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
                         "position to {}", position, partitionRecords.partition, nextOffset);
 
-                List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
-                if (records == null) {
-                    records = partRecords;
-                    drained.put(partitionRecords.partition, records);
-                } else {
-                    records.addAll(partRecords);
-                }
-
                 subscriptions.position(partitionRecords.partition, nextOffset);
-                return partRecords.size();
+                return partRecords;
             } else {
                 // these records aren't next in line based on the last consumed position, ignore them
                 // they must be from an obsolete request
@@ -471,8 +477,8 @@ public class Fetcher<K, V> {
             }
         }
 
-        partitionRecords.discard();
-        return 0;
+        partitionRecords.drain();
+        return Collections.emptyList();
     }
 
     /**
@@ -600,7 +606,7 @@ public class Fetcher<K, V> {
 
     private List<TopicPartition> fetchablePartitions() {
         List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
-        if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
+        if (nextInLineRecords != null && !nextInLineRecords.isDrained())
             fetchable.remove(nextInLineRecords.partition);
         for (CompletedFetch completedFetch : completedFetches)
             fetchable.remove(completedFetch.partition);
@@ -768,6 +774,7 @@ public class Fetcher<K, V> {
         private long fetchOffset;
         private TopicPartition partition;
         private List<ConsumerRecord<K, V>> records;
+        private int position = 0;
 
         public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
             this.fetchOffset = fetchOffset;
@@ -775,33 +782,26 @@ public class Fetcher<K, V> {
             this.records = records;
         }
 
-        private boolean isEmpty() {
-            return records == null || records.isEmpty();
+        private boolean isDrained() {
+            return records == null || position >= records.size();
         }
 
-        private void discard() {
+        private void drain() {
             this.records = null;
         }
 
-        private List<ConsumerRecord<K, V>> take(int n) {
-            if (records == null)
-                return new ArrayList<>();
-
-            if (n >= records.size()) {
-                List<ConsumerRecord<K, V>> res = this.records;
-                this.records = null;
-                return res;
-            }
+        private List<ConsumerRecord<K, V>> drainRecords(int n) {
+            if (isDrained())
+                return Collections.emptyList();
 
-            List<ConsumerRecord<K, V>> res = new ArrayList<>(n);
-            Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
-            for (int i = 0; i < n; i++) {
-                res.add(iterator.next());
-                iterator.remove();
-            }
+            // using a sublist avoids a potentially expensive list copy (depending on the size of the records
+            // and the maximum we can return from poll). The cost is that we cannot mutate the returned sublist.
+            int limit = Math.min(records.size(), position + n);
+            List<ConsumerRecord<K, V>> res = Collections.unmodifiableList(records.subList(position, limit));
 
-            if (iterator.hasNext())
-                this.fetchOffset = iterator.next().offset();
+            position = limit;
+            if (position < records.size())
+                fetchOffset = records.get(position).offset();
 
             return res;
         }