You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/02 21:30:31 UTC

kafka git commit: MINOR: Remove unnecessary synchronized block in org.apache.kafka.streams.processor.internals.StreamTask

Repository: kafka
Updated Branches:
  refs/heads/trunk bb629f224 -> e62dd4cb7


MINOR: Remove unnecessary synchronized block in org.apache.kafka.streams.processor.internals.StreamTask

The StreamTask is owned by a specific thread, so it doesn't seem necessary to synchronized the processing of the records as discussed with guozhangwang  on the dev mailing list

Author: PierreCoquentin <pi...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1688 from PierreCoquentin/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e62dd4cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e62dd4cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e62dd4cb

Branch: refs/heads/trunk
Commit: e62dd4cb770e3662524d4f2f50547549e3ab2eaf
Parents: bb629f2
Author: Pierre Coquentin <pi...@gmail.com>
Authored: Tue Aug 2 14:30:12 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 2 14:30:12 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/StreamTask.java | 66 ++++++++++----------
 1 file changed, 32 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e62dd4cb/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3126dd4..18b7646 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -152,51 +152,49 @@ public class StreamTask extends AbstractTask implements Punctuator {
      */
     @SuppressWarnings("unchecked")
     public int process() {
-        synchronized (this) {
-            // get the next record to process
-            StampedRecord record = partitionGroup.nextRecord(recordInfo);
+        // get the next record to process
+        StampedRecord record = partitionGroup.nextRecord(recordInfo);
 
-            // if there is no record to process, return immediately
-            if (record == null) {
-                requiresPoll = true;
-                return 0;
-            }
-
-            requiresPoll = false;
+        // if there is no record to process, return immediately
+        if (record == null) {
+            requiresPoll = true;
+            return 0;
+        }
 
-            try {
-                // process the record by passing to the source node of the topology
-                this.currRecord = record;
-                this.currNode = recordInfo.node();
-                TopicPartition partition = recordInfo.partition();
+        requiresPoll = false;
 
-                log.debug("Start processing one record [{}]", currRecord);
+        try {
+            // process the record by passing to the source node of the topology
+            this.currRecord = record;
+            this.currNode = recordInfo.node();
+            TopicPartition partition = recordInfo.partition();
 
-                this.currNode.process(currRecord.key(), currRecord.value());
+            log.debug("Start processing one record [{}]", currRecord);
 
-                log.debug("Completed processing one record [{}]", currRecord);
+            this.currNode.process(currRecord.key(), currRecord.value());
 
-                // update the consumed offset map after processing is done
-                consumedOffsets.put(partition, currRecord.offset());
-                commitOffsetNeeded = true;
+            log.debug("Completed processing one record [{}]", currRecord);
 
-                // after processing this record, if its partition queue's buffered size has been
-                // decreased to the threshold, we can then resume the consumption on this partition
-                if (recordInfo.queue().size() == this.maxBufferedSize) {
-                    consumer.resume(singleton(partition));
-                    requiresPoll = true;
-                }
+            // update the consumed offset map after processing is done
+            consumedOffsets.put(partition, currRecord.offset());
+            commitOffsetNeeded = true;
 
-                if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
-                    requiresPoll = true;
-                }
-            } finally {
-                this.currRecord = null;
-                this.currNode = null;
+            // after processing this record, if its partition queue's buffered size has been
+            // decreased to the threshold, we can then resume the consumption on this partition
+            if (recordInfo.queue().size() == this.maxBufferedSize) {
+                consumer.resume(singleton(partition));
+                requiresPoll = true;
             }
 
-            return partitionGroup.numBuffered();
+            if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
+                requiresPoll = true;
+            }
+        } finally {
+            this.currRecord = null;
+            this.currNode = null;
         }
+
+        return partitionGroup.numBuffered();
     }
 
     public boolean requiresPoll() {