You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/15 20:02:14 UTC
kafka git commit: KAFKA-2654: optimize unnecessary poll(0) away in
StreamTask
Repository: kafka
Updated Branches:
refs/heads/trunk 28e59a1df -> c50d39ea8
KAFKA-2654: optimize unnecessary poll(0) away in StreamTask
guozhangwang
This change aims to remove unnecessary ```consumer.poll(0)``` calls.
* ```once``` after some partition is resumed
* whenever the size of the top queue in any task is below ```BUFFERED_RECORDS_PER_PARTITION_CONFIG```
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #315 from ymatsuda/less_poll_zero
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c50d39ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c50d39ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c50d39ea
Branch: refs/heads/trunk
Commit: c50d39ea82e282d08163ebe7f0e398de2dc8c128
Parents: 28e59a1
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Oct 15 11:06:51 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 15 11:06:51 2015 -0700
----------------------------------------------------------------------
.../processor/internals/PartitionGroup.java | 5 ++++
.../streams/processor/internals/StreamTask.java | 17 ++++++++++++-
.../processor/internals/StreamThread.java | 25 ++++++++++++--------
3 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c50d39ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 44a6c5c..d888085 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -154,6 +154,11 @@ public class PartitionGroup {
return recordQueue.size();
}
+ public int topQueueSize() {
+ RecordQueue recordQueue = queuesByTime.peek();
+ return (recordQueue == null) ? 0 : recordQueue.size();
+ }
+
public int numBuffered() {
return totalBuffered;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c50d39ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a94202f..0ceec52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -64,6 +64,8 @@ public class StreamTask implements Punctuator {
private StampedRecord currRecord = null;
private ProcessorNode currNode = null;
+ private boolean requiresPoll = true;
+
/**
* Create {@link StreamTask} with its assigned partitions
*
@@ -173,8 +175,12 @@ public class StreamTask implements Punctuator {
StampedRecord record = partitionGroup.nextRecord(recordInfo);
// if there is no record to process, return immediately
- if (record == null)
+ if (record == null) {
+ requiresPoll = true;
return 0;
+ }
+
+ requiresPoll = false;
try {
// process the record by passing to the source node of the topology
@@ -196,6 +202,11 @@ public class StreamTask implements Punctuator {
// decreased to the threshold, we can then resume the consumption on this partition
if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
consumer.resume(partition);
+ requiresPoll = true;
+ }
+
+ if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
+ requiresPoll = true;
}
} finally {
this.currRecord = null;
@@ -206,6 +217,10 @@ public class StreamTask implements Punctuator {
}
}
+ public boolean requiresPoll() {
+ return requiresPoll;
+ }
+
/**
* Possibly trigger registered punctuation functions if
* current time has reached the defined stamp
http://git-wip-us.apache.org/repos/asf/kafka/blob/c50d39ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 4a68332..7d935eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -231,33 +231,38 @@ public class StreamThread extends Thread {
private void runLoop() {
try {
int totalNumBuffered = 0;
+ boolean requiresPoll = true;
consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
while (stillRunning()) {
- long startPoll = time.milliseconds();
-
// try to fetch some records if necessary
- ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
+ if (requiresPoll) {
+ long startPoll = time.milliseconds();
+
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
- if (!records.isEmpty()) {
- for (StreamTask task : tasks.values()) {
- for (TopicPartition partition : task.partitions()) {
- task.addRecords(partition, records.records(partition));
+ if (!records.isEmpty()) {
+ for (StreamTask task : tasks.values()) {
+ for (TopicPartition partition : task.partitions()) {
+ task.addRecords(partition, records.records(partition));
+ }
}
}
- }
- long endPoll = time.milliseconds();
- sensors.pollTimeSensor.record(endPoll - startPoll);
+ long endPoll = time.milliseconds();
+ sensors.pollTimeSensor.record(endPoll - startPoll);
+ }
// try to process one record from each task
totalNumBuffered = 0;
+ requiresPoll = false;
for (StreamTask task : tasks.values()) {
long startProcess = time.milliseconds();
totalNumBuffered += task.process();
+ requiresPoll = requiresPoll || task.requiresPoll();
sensors.processTimeSensor.record(time.milliseconds() - startProcess);
}