You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/02 21:30:31 UTC
kafka git commit: MINOR: Remove unnecessary synchronized block in
org.apache.kafka.streams.processor.internals.StreamTask
Repository: kafka
Updated Branches:
refs/heads/trunk bb629f224 -> e62dd4cb7
MINOR: Remove unnecessary synchronized block in org.apache.kafka.streams.processor.internals.StreamTask
The StreamTask is owned by a specific thread, so it doesn't seem necessary to synchronized the processing of the records as discussed with guozhangwang on the dev mailing list
Author: PierreCoquentin <pi...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1688 from PierreCoquentin/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e62dd4cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e62dd4cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e62dd4cb
Branch: refs/heads/trunk
Commit: e62dd4cb770e3662524d4f2f50547549e3ab2eaf
Parents: bb629f2
Author: Pierre Coquentin <pi...@gmail.com>
Authored: Tue Aug 2 14:30:12 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 2 14:30:12 2016 -0700
----------------------------------------------------------------------
.../streams/processor/internals/StreamTask.java | 66 ++++++++++----------
1 file changed, 32 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e62dd4cb/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3126dd4..18b7646 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -152,51 +152,49 @@ public class StreamTask extends AbstractTask implements Punctuator {
*/
@SuppressWarnings("unchecked")
public int process() {
- synchronized (this) {
- // get the next record to process
- StampedRecord record = partitionGroup.nextRecord(recordInfo);
+ // get the next record to process
+ StampedRecord record = partitionGroup.nextRecord(recordInfo);
- // if there is no record to process, return immediately
- if (record == null) {
- requiresPoll = true;
- return 0;
- }
-
- requiresPoll = false;
+ // if there is no record to process, return immediately
+ if (record == null) {
+ requiresPoll = true;
+ return 0;
+ }
- try {
- // process the record by passing to the source node of the topology
- this.currRecord = record;
- this.currNode = recordInfo.node();
- TopicPartition partition = recordInfo.partition();
+ requiresPoll = false;
- log.debug("Start processing one record [{}]", currRecord);
+ try {
+ // process the record by passing to the source node of the topology
+ this.currRecord = record;
+ this.currNode = recordInfo.node();
+ TopicPartition partition = recordInfo.partition();
- this.currNode.process(currRecord.key(), currRecord.value());
+ log.debug("Start processing one record [{}]", currRecord);
- log.debug("Completed processing one record [{}]", currRecord);
+ this.currNode.process(currRecord.key(), currRecord.value());
- // update the consumed offset map after processing is done
- consumedOffsets.put(partition, currRecord.offset());
- commitOffsetNeeded = true;
+ log.debug("Completed processing one record [{}]", currRecord);
- // after processing this record, if its partition queue's buffered size has been
- // decreased to the threshold, we can then resume the consumption on this partition
- if (recordInfo.queue().size() == this.maxBufferedSize) {
- consumer.resume(singleton(partition));
- requiresPoll = true;
- }
+ // update the consumed offset map after processing is done
+ consumedOffsets.put(partition, currRecord.offset());
+ commitOffsetNeeded = true;
- if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
- requiresPoll = true;
- }
- } finally {
- this.currRecord = null;
- this.currNode = null;
+ // after processing this record, if its partition queue's buffered size has been
+ // decreased to the threshold, we can then resume the consumption on this partition
+ if (recordInfo.queue().size() == this.maxBufferedSize) {
+ consumer.resume(singleton(partition));
+ requiresPoll = true;
}
- return partitionGroup.numBuffered();
+ if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
+ requiresPoll = true;
+ }
+ } finally {
+ this.currRecord = null;
+ this.currNode = null;
}
+
+ return partitionGroup.numBuffered();
}
public boolean requiresPoll() {