You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/13 22:13:17 UTC

kafka git commit: KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 4aba4bc1d -> 69b451e28


KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69b451e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69b451e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69b451e2

Branch: refs/heads/trunk
Commit: 69b451e28944deb162f7427105c3090f41c8797f
Parents: 4aba4bc
Author: Tim Brooks <tb...@gmail.com>
Authored: Mon Jul 13 13:13:02 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jul 13 13:13:02 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/consumer/KafkaConsumer.java  | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69b451e2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7aa0760..b4e8f7f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -51,6 +51,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.utils.Utils.min;
@@ -395,6 +396,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
     private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
     private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+    private static final long NO_CURRENT_THREAD = -1L;
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
 
     private final Coordinator coordinator;
@@ -417,7 +419,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
-    private final AtomicReference<Long> currentThread = new AtomicReference<Long>();
+    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
     // refcount is used to allow reentrant access by the thread who has acquired currentThread
     private final AtomicInteger refcount = new AtomicInteger(0);
 
@@ -1355,8 +1357,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     private void acquire() {
         ensureNotClosed();
-        Long threadId = Thread.currentThread().getId();
-        if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId))
+        long threadId = Thread.currentThread().getId();
+        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
             throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
         refcount.incrementAndGet();
     }
@@ -1366,6 +1368,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     private void release() {
         if (refcount.decrementAndGet() == 0)
-            currentThread.set(null);
+            currentThread.set(NO_CURRENT_THREAD);
     }
 }