You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/13 22:13:17 UTC
kafka git commit: KAFKA-2312: use atomic long for thread id reference;
reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang
Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 4aba4bc1d -> 69b451e28
KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69b451e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69b451e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69b451e2
Branch: refs/heads/trunk
Commit: 69b451e28944deb162f7427105c3090f41c8797f
Parents: 4aba4bc
Author: Tim Brooks <tb...@gmail.com>
Authored: Mon Jul 13 13:13:02 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jul 13 13:13:02 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/consumer/KafkaConsumer.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/69b451e2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7aa0760..b4e8f7f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -51,6 +51,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.min;
@@ -395,6 +396,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+ private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Coordinator coordinator;
@@ -417,7 +419,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
- private final AtomicReference<Long> currentThread = new AtomicReference<Long>();
+ private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// refcount is used to allow reentrant access by the thread who has acquired currentThread
private final AtomicInteger refcount = new AtomicInteger(0);
@@ -1355,8 +1357,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
private void acquire() {
ensureNotClosed();
- Long threadId = Thread.currentThread().getId();
- if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId))
+ long threadId = Thread.currentThread().getId();
+ if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
@@ -1366,6 +1368,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
private void release() {
if (refcount.decrementAndGet() == 0)
- currentThread.set(null);
+ currentThread.set(NO_CURRENT_THREAD);
}
}