You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/02 02:14:33 UTC

kafka git commit: MINOR: small code optimizations in streams

Repository: kafka
Updated Branches:
  refs/heads/trunk 75ec67eda -> bd5325dd8


MINOR: small code optimizations in streams

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1176 from ymatsuda/optimize


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd5325dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd5325dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd5325dd

Branch: refs/heads/trunk
Commit: bd5325dd8be7c5cf920acee2aa33b3c288bd551a
Parents: 75ec67e
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Apr 1 17:14:29 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 1 17:14:29 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/PartitionGroup.java     |  6 +++-
 .../streams/processor/internals/StreamTask.java | 12 +++----
 .../processor/internals/StreamThread.java       | 35 ++++++++------------
 3 files changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd5325dd/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index b487ff5..3d8f792 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -49,6 +49,10 @@ public class PartitionGroup {
         public TopicPartition partition() {
             return queue.partition();
         }
+
+        public RecordQueue queue() {
+            return queue;
+        }
     }
 
     // since task is thread-safe, we do not need to synchronize on local variables
@@ -88,7 +92,7 @@ public class PartitionGroup {
             // get the first record from this queue.
             record = queue.poll();
 
-            if (queue.size() > 0) {
+            if (!queue.isEmpty()) {
                 queuesByTime.offer(queue);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd5325dd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index afa303c..61aeced 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -179,7 +179,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
                 // after processing this record, if its partition queue's buffered size has been
                 // decreased to the threshold, we can then resume the consumption on this partition
-                if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
+                if (recordInfo.queue().size() == this.maxBufferedSize) {
                     consumer.resume(singleton(partition));
                     requiresPoll = true;
                 }
@@ -320,13 +320,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
     @SuppressWarnings("unchecked")
     public <K, V> void forward(K key, V value) {
         ProcessorNode thisNode = currNode;
-        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            currNode = childNode;
-            try {
+        try {
+            for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+                currNode = childNode;
                 childNode.process(key, value);
-            } finally {
-                currNode = thisNode;
             }
+        } finally {
+            currNode = thisNode;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd5325dd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7d6b98f..c2a8e06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -350,9 +350,12 @@ public class StreamThread extends Thread {
                     requiresPoll = requiresPoll || task.requiresPoll();
 
                     sensors.processTimeSensor.record(time.milliseconds() - startProcess);
-                }
 
-                maybePunctuate();
+                    maybePunctuate(task);
+
+                    if (task.commitNeeded())
+                        commitOne(task, time.milliseconds());
+                }
 
                 // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
                 // even when we paused all partitions.
@@ -424,18 +427,16 @@ public class StreamThread extends Thread {
         return true;
     }
 
-    private void maybePunctuate() {
-        for (StreamTask task : activeTasks.values()) {
-            try {
-                long now = time.milliseconds();
+    private void maybePunctuate(StreamTask task) {
+        try {
+            long now = time.milliseconds();
 
-                if (task.maybePunctuate(now))
-                    sensors.punctuateTimeSensor.record(time.milliseconds() - now);
+            if (task.maybePunctuate(now))
+                sensors.punctuateTimeSensor.record(time.milliseconds() - now);
 
-            } catch (KafkaException e) {
-                log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
+        } catch (KafkaException e) {
+            log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            throw e;
         }
     }
 
@@ -449,16 +450,6 @@ public class StreamThread extends Thread {
             lastCommit = now;
 
             processStandbyRecords = true;
-        } else {
-            for (StreamTask task : activeTasks.values()) {
-                try {
-                    if (task.commitNeeded())
-                        commitOne(task, time.milliseconds());
-                } catch (KafkaException e) {
-                    log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                    throw e;
-                }
-            }
         }
     }