You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/15 20:02:14 UTC

kafka git commit: KAFKA-2654: optimize unnecessary poll(0) away in StreamTask

Repository: kafka
Updated Branches:
  refs/heads/trunk 28e59a1df -> c50d39ea8


KAFKA-2654: optimize unnecessary poll(0) away in StreamTask

guozhangwang
This change aims to remove unnecessary ```consumer.poll(0)``` calls.
* ```once``` after some partition is resumed
* whenever the size of the top queue in any task is below ```BUFFERED_RECORDS_PER_PARTITION_CONFIG```

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #315 from ymatsuda/less_poll_zero


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c50d39ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c50d39ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c50d39ea

Branch: refs/heads/trunk
Commit: c50d39ea82e282d08163ebe7f0e398de2dc8c128
Parents: 28e59a1
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Oct 15 11:06:51 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 15 11:06:51 2015 -0700

----------------------------------------------------------------------
 .../processor/internals/PartitionGroup.java     |  5 ++++
 .../streams/processor/internals/StreamTask.java | 17 ++++++++++++-
 .../processor/internals/StreamThread.java       | 25 ++++++++++++--------
 3 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c50d39ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 44a6c5c..d888085 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -154,6 +154,11 @@ public class PartitionGroup {
         return recordQueue.size();
     }
 
+    public int topQueueSize() {
+        RecordQueue recordQueue = queuesByTime.peek();
+        return (recordQueue == null) ? 0 : recordQueue.size();
+    }
+
     public int numBuffered() {
         return totalBuffered;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50d39ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a94202f..0ceec52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -64,6 +64,8 @@ public class StreamTask implements Punctuator {
     private StampedRecord currRecord = null;
     private ProcessorNode currNode = null;
 
+    private boolean requiresPoll = true;
+
     /**
      * Create {@link StreamTask} with its assigned partitions
      *
@@ -173,8 +175,12 @@ public class StreamTask implements Punctuator {
             StampedRecord record = partitionGroup.nextRecord(recordInfo);
 
             // if there is no record to process, return immediately
-            if (record == null)
+            if (record == null) {
+                requiresPoll = true;
                 return 0;
+            }
+
+            requiresPoll = false;
 
             try {
                 // process the record by passing to the source node of the topology
@@ -196,6 +202,11 @@ public class StreamTask implements Punctuator {
                 // decreased to the threshold, we can then resume the consumption on this partition
                 if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
                     consumer.resume(partition);
+                    requiresPoll = true;
+                }
+
+                if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
+                    requiresPoll = true;
                 }
             } finally {
                 this.currRecord = null;
@@ -206,6 +217,10 @@ public class StreamTask implements Punctuator {
         }
     }
 
+    public boolean requiresPoll() {
+        return requiresPoll;
+    }
+
     /**
      * Possibly trigger registered punctuation functions if
      * current time has reached the defined stamp

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50d39ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 4a68332..7d935eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -231,33 +231,38 @@ public class StreamThread extends Thread {
     private void runLoop() {
         try {
             int totalNumBuffered = 0;
+            boolean requiresPoll = true;
 
             consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
 
             while (stillRunning()) {
-                long startPoll = time.milliseconds();
-
                 // try to fetch some records if necessary
-                ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
+                if (requiresPoll) {
+                    long startPoll = time.milliseconds();
+
+                    ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
 
-                if (!records.isEmpty()) {
-                    for (StreamTask task : tasks.values()) {
-                        for (TopicPartition partition : task.partitions()) {
-                            task.addRecords(partition, records.records(partition));
+                    if (!records.isEmpty()) {
+                        for (StreamTask task : tasks.values()) {
+                            for (TopicPartition partition : task.partitions()) {
+                                task.addRecords(partition, records.records(partition));
+                            }
                         }
                     }
-                }
 
-                long endPoll = time.milliseconds();
-                sensors.pollTimeSensor.record(endPoll - startPoll);
+                    long endPoll = time.milliseconds();
+                    sensors.pollTimeSensor.record(endPoll - startPoll);
+                }
 
                 // try to process one record from each task
                 totalNumBuffered = 0;
+                requiresPoll = false;
 
                 for (StreamTask task : tasks.values()) {
                     long startProcess = time.milliseconds();
 
                     totalNumBuffered += task.process();
+                    requiresPoll = requiresPoll || task.requiresPoll();
 
                     sensors.processTimeSensor.record(time.milliseconds() - startProcess);
                 }