You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/31 23:23:30 UTC
kafka git commit: KAFKA-2486;
fix performance regression in new consumer
Repository: kafka
Updated Branches:
refs/heads/trunk 835495996 -> 13c432f79
KAFKA-2486; fix performance regression in new consumer
The sleep() in KafkaConsumer's poll blocked any pending IO from being completed and created a performance bottleneck. It was intended to implement the fetch backoff behavior, but that was a misunderstanding of the setting "retry.backoff.ms" which should only affect failed fetches.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava, Gwen Shapira
Closes #180 from hachikuji/KAFKA-2486
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13c432f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13c432f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13c432f7
Branch: refs/heads/trunk
Commit: 13c432f7952de27e9bf8cb4adb33a91ae3a4b738
Parents: 8354959
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Aug 31 14:23:23 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Aug 31 14:23:23 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/KafkaConsumer.java | 15 +++------------
1 file changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/13c432f7/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 938981c..8cd285c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -32,9 +32,9 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -58,8 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import static org.apache.kafka.common.utils.Utils.min;
-
/**
* A Kafka client that consumes records from a Kafka cluster.
* <p>
@@ -729,7 +727,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
while (remaining >= 0) {
long start = time.milliseconds();
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
- long end = time.milliseconds();
if (!records.isEmpty()) {
// if data is available, then return it, but first send off the
@@ -740,13 +737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return new ConsumerRecords<>(records);
}
- remaining -= end - start;
-
- // nothing was available, so we should backoff before retrying
- if (remaining > 0) {
- Utils.sleep(min(remaining, retryBackoffMs));
- remaining -= time.milliseconds() - end;
- }
+ remaining -= time.milliseconds() - start;
}
return ConsumerRecords.empty();