You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/31 23:23:30 UTC

kafka git commit: KAFKA-2486; fix performance regression in new consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk 835495996 -> 13c432f79


KAFKA-2486; fix performance regression in new consumer

The sleep() in KafkaConsumer's poll blocked any pending IO from being completed and created a performance bottleneck. It was intended to implement the fetch backoff behavior, but that was a misunderstanding of the setting "retry.backoff.ms" which should only affect failed fetches.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava, Gwen Shapira

Closes #180 from hachikuji/KAFKA-2486


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13c432f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13c432f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13c432f7

Branch: refs/heads/trunk
Commit: 13c432f7952de27e9bf8cb4adb33a91ae3a4b738
Parents: 8354959
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Aug 31 14:23:23 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Aug 31 14:23:23 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java | 15 +++------------
 1 file changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/13c432f7/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 938981c..8cd285c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -32,9 +32,9 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -58,8 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.kafka.common.utils.Utils.min;
-
 /**
  * A Kafka client that consumes records from a Kafka cluster.
  * <p>
@@ -729,7 +727,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             while (remaining >= 0) {
                 long start = time.milliseconds();
                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
-                long end = time.milliseconds();
 
                 if (!records.isEmpty()) {
                     // if data is available, then return it, but first send off the
@@ -740,13 +737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     return new ConsumerRecords<>(records);
                 }
 
-                remaining -= end - start;
-
-                // nothing was available, so we should backoff before retrying
-                if (remaining > 0) {
-                    Utils.sleep(min(remaining, retryBackoffMs));
-                    remaining -= time.milliseconds() - end;
-                }
+                remaining -= time.milliseconds() - start;
             }
 
             return ConsumerRecords.empty();