You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/02 02:14:33 UTC
kafka git commit: MINOR: small code optimizations in streams
Repository: kafka
Updated Branches:
refs/heads/trunk 75ec67eda -> bd5325dd8
MINOR: small code optimizations in streams
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1176 from ymatsuda/optimize
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd5325dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd5325dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd5325dd
Branch: refs/heads/trunk
Commit: bd5325dd8be7c5cf920acee2aa33b3c288bd551a
Parents: 75ec67e
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Apr 1 17:14:29 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 1 17:14:29 2016 -0700
----------------------------------------------------------------------
.../processor/internals/PartitionGroup.java | 6 +++-
.../streams/processor/internals/StreamTask.java | 12 +++----
.../processor/internals/StreamThread.java | 35 ++++++++------------
3 files changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd5325dd/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index b487ff5..3d8f792 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -49,6 +49,10 @@ public class PartitionGroup {
public TopicPartition partition() {
return queue.partition();
}
+
+ public RecordQueue queue() {
+ return queue;
+ }
}
// since task is thread-safe, we do not need to synchronize on local variables
@@ -88,7 +92,7 @@ public class PartitionGroup {
// get the first record from this queue.
record = queue.poll();
- if (queue.size() > 0) {
+ if (!queue.isEmpty()) {
queuesByTime.offer(queue);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd5325dd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index afa303c..61aeced 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -179,7 +179,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
// after processing this record, if its partition queue's buffered size has been
// decreased to the threshold, we can then resume the consumption on this partition
- if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
+ if (recordInfo.queue().size() == this.maxBufferedSize) {
consumer.resume(singleton(partition));
requiresPoll = true;
}
@@ -320,13 +320,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
@SuppressWarnings("unchecked")
public <K, V> void forward(K key, V value) {
ProcessorNode thisNode = currNode;
- for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
- currNode = childNode;
- try {
+ try {
+ for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+ currNode = childNode;
childNode.process(key, value);
- } finally {
- currNode = thisNode;
}
+ } finally {
+ currNode = thisNode;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd5325dd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7d6b98f..c2a8e06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -350,9 +350,12 @@ public class StreamThread extends Thread {
requiresPoll = requiresPoll || task.requiresPoll();
sensors.processTimeSensor.record(time.milliseconds() - startProcess);
- }
- maybePunctuate();
+ maybePunctuate(task);
+
+ if (task.commitNeeded())
+ commitOne(task, time.milliseconds());
+ }
// if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
// even when we paused all partitions.
@@ -424,18 +427,16 @@ public class StreamThread extends Thread {
return true;
}
- private void maybePunctuate() {
- for (StreamTask task : activeTasks.values()) {
- try {
- long now = time.milliseconds();
+ private void maybePunctuate(StreamTask task) {
+ try {
+ long now = time.milliseconds();
- if (task.maybePunctuate(now))
- sensors.punctuateTimeSensor.record(time.milliseconds() - now);
+ if (task.maybePunctuate(now))
+ sensors.punctuateTimeSensor.record(time.milliseconds() - now);
- } catch (KafkaException e) {
- log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
+ } catch (KafkaException e) {
+ log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+ throw e;
}
}
@@ -449,16 +450,6 @@ public class StreamThread extends Thread {
lastCommit = now;
processStandbyRecords = true;
- } else {
- for (StreamTask task : activeTasks.values()) {
- try {
- if (task.commitNeeded())
- commitOne(task, time.milliseconds());
- } catch (KafkaException e) {
- log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
- }
}
}