You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/05/03 18:49:59 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1140 Avoid lock on queue
for message counts
ARTEMIS-1140 Avoid lock on queue for message counts
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/33f2ad65
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/33f2ad65
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/33f2ad65
Branch: refs/heads/master
Commit: 33f2ad65c915a8fa2c3606271f106bf5703ace83
Parents: 3549377
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed May 3 12:10:59 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed May 3 13:49:44 2017 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 32 +++++++++++---------
1 file changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33f2ad65/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index c2cfdef..8a0cda0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -214,6 +214,8 @@ public class QueueImpl implements Queue {
// We cache the consumers here since we don't want to include the redistributor
+ private final AtomicInteger consumersCount = new AtomicInteger();
+
private final Set<Consumer> consumerSet = new HashSet<>();
private final Map<SimpleString, Consumer> groups = new HashMap<>();
@@ -807,7 +809,9 @@ public class QueueImpl implements Queue {
consumerList.add(new ConsumerHolder(consumer));
- consumerSet.add(consumer);
+ if (consumerSet.add(consumer)) {
+ consumersCount.incrementAndGet();
+ }
if (refCountForConsumers != null) {
refCountForConsumers.increment();
@@ -837,7 +841,9 @@ public class QueueImpl implements Queue {
pos = consumerList.size() - 1;
}
- consumerSet.remove(consumer);
+ if (consumerSet.remove(consumer)) {
+ consumersCount.decrementAndGet();
+ }
LinkedList<SimpleString> groupsToRemove = null;
@@ -924,8 +930,8 @@ public class QueueImpl implements Queue {
}
@Override
- public synchronized int getConsumerCount() {
- return consumerSet.size();
+ public int getConsumerCount() {
+ return consumersCount.get();
}
@Override
@@ -1011,16 +1017,14 @@ public class QueueImpl implements Queue {
@Override
public long getMessageCount() {
- synchronized (this) {
- if (pageSubscription != null) {
- // messageReferences will have depaged messages which we need to discount from the counter as they are
- // counted on the pageSubscription as well
- return messageReferences.size() + getScheduledCount() +
- deliveringCount.get() +
- pageSubscription.getMessageCount();
- } else {
- return messageReferences.size() + getScheduledCount() + deliveringCount.get();
- }
+ if (pageSubscription != null) {
+ // messageReferences will have depaged messages which we need to discount from the counter as they are
+ // counted on the pageSubscription as well
+ return messageReferences.size() + getScheduledCount() +
+ deliveringCount.get() +
+ pageSubscription.getMessageCount();
+ } else {
+ return messageReferences.size() + getScheduledCount() + deliveringCount.get();
}
}