You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/30 14:53:49 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1379 Some queue stats not
threadsafe
ARTEMIS-1379 Some queue stats not threadsafe
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2f5a9322
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2f5a9322
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2f5a9322
Branch: refs/heads/master
Commit: 2f5a9322d0ae2d400cf06b962afed1315c8a02d4
Parents: f37093e3
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Aug 29 22:13:27 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 30 10:53:43 2017 -0400
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 50 +++++------
.../management/ManagementTestBase.java | 9 ++
.../management/QueueControlTest.java | 87 ++++++++++++++++++--
3 files changed, 112 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f5a9322/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 35dd5ed..1f1f4fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -177,13 +177,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
- private long messagesAdded;
+ private AtomicLong messagesAdded = new AtomicLong(0);
- private long messagesAcknowledged;
+ private AtomicLong messagesAcknowledged = new AtomicLong(0);
- private long messagesExpired;
+ private AtomicLong messagesExpired = new AtomicLong(0);
- private long messagesKilled;
+ private AtomicLong messagesKilled = new AtomicLong(0);
protected final AtomicInteger deliveringCount = new AtomicInteger(0);
@@ -637,7 +637,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
directDeliver = false;
if (!ref.isPaged()) {
- messagesAdded++;
+ messagesAdded.incrementAndGet();
}
}
@@ -702,7 +702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
synchronized (this) {
if (!ref.isPaged()) {
- messagesAdded++;
+ messagesAdded.incrementAndGet();
}
}
@@ -1132,11 +1132,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (reason == AckReason.EXPIRED) {
- messagesExpired++;
+ messagesExpired.incrementAndGet();
} else if (reason == AckReason.KILLED) {
- messagesKilled++;
+ messagesKilled.incrementAndGet();
} else {
- messagesAcknowledged++;
+ messagesAcknowledged.incrementAndGet();
}
if (server != null) {
@@ -1170,11 +1170,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (reason == AckReason.EXPIRED) {
- messagesExpired++;
+ messagesExpired.incrementAndGet();
} else if (reason == AckReason.KILLED) {
- messagesKilled++;
+ messagesKilled.incrementAndGet();
} else {
- messagesAcknowledged++;
+ messagesAcknowledged.incrementAndGet();
}
if (server != null) {
@@ -1195,7 +1195,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// https://issues.jboss.org/browse/HORNETQ-609
incDelivering();
- messagesAcknowledged++;
+ messagesAcknowledged.incrementAndGet();
}
private RefsOperation getRefsOperation(final Transaction tx) {
@@ -1314,7 +1314,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void incrementMesssagesAdded() {
- messagesAdded++;
+ messagesAdded.incrementAndGet();
}
@Override
@@ -1332,25 +1332,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public long getMessagesAdded() {
if (pageSubscription != null) {
- return messagesAdded + pageSubscription.getCounter().getValueAdded();
+ return messagesAdded.get() + pageSubscription.getCounter().getValueAdded();
} else {
- return messagesAdded;
+ return messagesAdded.get();
}
}
@Override
public long getMessagesAcknowledged() {
- return messagesAcknowledged;
+ return messagesAcknowledged.get();
}
@Override
public long getMessagesExpired() {
- return messagesExpired;
+ return messagesExpired.get();
}
@Override
public long getMessagesKilled() {
- return messagesKilled;
+ return messagesKilled.get();
}
@Override
@@ -2057,7 +2057,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
internalAddTail(ref);
if (!ref.isPaged()) {
- messagesAdded++;
+ messagesAdded.incrementAndGet();
}
if (added++ > MAX_DELIVERIES_IN_LOOP) {
@@ -2734,7 +2734,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
groups.put(groupID, consumer);
}
- messagesAdded++;
+ messagesAdded.incrementAndGet();
deliveriesInTransit.countUp();
proceedDeliver(consumer, ref);
@@ -2911,22 +2911,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized void resetMessagesAdded() {
- messagesAdded = 0;
+ messagesAdded.set(0);
}
@Override
public synchronized void resetMessagesAcknowledged() {
- messagesAcknowledged = 0;
+ messagesAcknowledged.set(0);
}
@Override
public synchronized void resetMessagesExpired() {
- messagesExpired = 0;
+ messagesExpired.set(0);
}
@Override
public synchronized void resetMessagesKilled() {
- messagesKilled = 0;
+ messagesKilled.set(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f5a9322/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
index 6647c43..ac09745 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
@@ -20,6 +20,7 @@ import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -104,6 +105,14 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
return queueControl;
}
+ protected QueueControl createManagementControl(final SimpleString address,
+ final SimpleString queue,
+ final RoutingType routingType) throws Exception {
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, routingType, mbeanServer);
+
+ return queueControl;
+ }
+
protected long getMessageCount(QueueControl control) throws Exception {
control.flushExecutor();
return control.getMessageCount();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f5a9322/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 267549f..404f466 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -24,11 +24,14 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -48,7 +51,6 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManage
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.utils.Base64;
@@ -606,6 +608,81 @@ public class QueueControlTest extends ManagementTestBase {
}
@Test
+ public void testMessagesAddedAndMessagesAcknowledged() throws Exception {
+ final int THREAD_COUNT = 5;
+ final int MSG_COUNT = 1000;
+
+ CountDownLatch producerCountDown = new CountDownLatch(THREAD_COUNT);
+ CountDownLatch consumerCountDown = new CountDownLatch(THREAD_COUNT);
+
+ ExecutorService producerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
+ ExecutorService consumerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ try {
+ session.createQueue(address, RoutingType.ANYCAST, queue, null, false);
+
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ producerExecutor.submit(() -> {
+ try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientProducer producer = session.createProducer(address)) {
+ for (int j = 0; j < MSG_COUNT; j++) {
+ producer.send(session.createMessage(false));
+ Thread.sleep(5);
+ }
+ producerCountDown.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ consumerExecutor.submit(() -> {
+ try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientConsumer consumer = session.createConsumer(queue)) {
+ session.start();
+ for (int j = 0; j < MSG_COUNT; j++) {
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ }
+ session.commit();
+ consumerCountDown.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ producerCountDown.await(30, TimeUnit.SECONDS);
+ consumerCountDown.await(30, TimeUnit.SECONDS);
+
+ QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
+ Assert.assertEquals(0, queueControl.getMessageCount());
+ Assert.assertEquals(0, queueControl.getConsumerCount());
+ Assert.assertEquals(0, queueControl.getDeliveringCount());
+ Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAdded());
+ Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAcknowledged());
+
+ session.deleteQueue(queue);
+ } finally {
+ shutdownExecutor(producerExecutor);
+ shutdownExecutor(consumerExecutor);
+ }
+ }
+
+ private void shutdownExecutor(ExecutorService executor) {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
public void testListMessagesAsJSONWithNullFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
@@ -2153,14 +2230,6 @@ public class QueueControlTest extends ManagementTestBase {
session.start();
}
- @Override
- protected QueueControl createManagementControl(final SimpleString address,
- final SimpleString queue) throws Exception {
- QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, mbeanServer);
-
- return queueControl;
- }
-
protected long getFirstMessageId(final QueueControl queueControl) throws Exception {
JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
JsonObject object = (JsonObject) array.get(0);