You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/30 14:53:49 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1379 Some queue stats not threadsafe

ARTEMIS-1379 Some queue stats not threadsafe


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2f5a9322
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2f5a9322
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2f5a9322

Branch: refs/heads/master
Commit: 2f5a9322d0ae2d400cf06b962afed1315c8a02d4
Parents: f37093e3
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Aug 29 22:13:27 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 30 10:53:43 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 50 +++++------
 .../management/ManagementTestBase.java          |  9 ++
 .../management/QueueControlTest.java            | 87 ++++++++++++++++++--
 3 files changed, 112 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f5a9322/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 35dd5ed..1f1f4fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -177,13 +177,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
-   private long messagesAdded;
+   private AtomicLong messagesAdded = new AtomicLong(0);
 
-   private long messagesAcknowledged;
+   private AtomicLong messagesAcknowledged = new AtomicLong(0);
 
-   private long messagesExpired;
+   private AtomicLong messagesExpired = new AtomicLong(0);
 
-   private long messagesKilled;
+   private AtomicLong messagesKilled = new AtomicLong(0);
 
    protected final AtomicInteger deliveringCount = new AtomicInteger(0);
 
@@ -637,7 +637,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       directDeliver = false;
 
       if (!ref.isPaged()) {
-         messagesAdded++;
+         messagesAdded.incrementAndGet();
       }
    }
 
@@ -702,7 +702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
          synchronized (this) {
             if (!ref.isPaged()) {
-               messagesAdded++;
+               messagesAdded.incrementAndGet();
             }
          }
 
@@ -1132,11 +1132,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (reason == AckReason.EXPIRED) {
-         messagesExpired++;
+         messagesExpired.incrementAndGet();
       } else if (reason == AckReason.KILLED) {
-         messagesKilled++;
+         messagesKilled.incrementAndGet();
       } else {
-         messagesAcknowledged++;
+         messagesAcknowledged.incrementAndGet();
       }
 
       if (server != null) {
@@ -1170,11 +1170,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (reason == AckReason.EXPIRED) {
-         messagesExpired++;
+         messagesExpired.incrementAndGet();
       } else if (reason == AckReason.KILLED) {
-         messagesKilled++;
+         messagesKilled.incrementAndGet();
       } else {
-         messagesAcknowledged++;
+         messagesAcknowledged.incrementAndGet();
       }
 
       if (server != null) {
@@ -1195,7 +1195,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       // https://issues.jboss.org/browse/HORNETQ-609
       incDelivering();
 
-      messagesAcknowledged++;
+      messagesAcknowledged.incrementAndGet();
    }
 
    private RefsOperation getRefsOperation(final Transaction tx) {
@@ -1314,7 +1314,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public void incrementMesssagesAdded() {
-      messagesAdded++;
+      messagesAdded.incrementAndGet();
    }
 
    @Override
@@ -1332,25 +1332,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @Override
    public long getMessagesAdded() {
       if (pageSubscription != null) {
-         return messagesAdded + pageSubscription.getCounter().getValueAdded();
+         return messagesAdded.get() + pageSubscription.getCounter().getValueAdded();
       } else {
-         return messagesAdded;
+         return messagesAdded.get();
       }
    }
 
    @Override
    public long getMessagesAcknowledged() {
-      return messagesAcknowledged;
+      return messagesAcknowledged.get();
    }
 
    @Override
    public long getMessagesExpired() {
-      return messagesExpired;
+      return messagesExpired.get();
    }
 
    @Override
    public long getMessagesKilled() {
-      return messagesKilled;
+      return messagesKilled.get();
    }
 
    @Override
@@ -2057,7 +2057,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          internalAddTail(ref);
 
          if (!ref.isPaged()) {
-            messagesAdded++;
+            messagesAdded.incrementAndGet();
          }
 
          if (added++ > MAX_DELIVERIES_IN_LOOP) {
@@ -2734,7 +2734,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   groups.put(groupID, consumer);
                }
 
-               messagesAdded++;
+               messagesAdded.incrementAndGet();
 
                deliveriesInTransit.countUp();
                proceedDeliver(consumer, ref);
@@ -2911,22 +2911,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public synchronized void resetMessagesAdded() {
-      messagesAdded = 0;
+      messagesAdded.set(0);
    }
 
    @Override
    public synchronized void resetMessagesAcknowledged() {
-      messagesAcknowledged = 0;
+      messagesAcknowledged.set(0);
    }
 
    @Override
    public synchronized void resetMessagesExpired() {
-      messagesExpired = 0;
+      messagesExpired.set(0);
    }
 
    @Override
    public synchronized void resetMessagesKilled() {
-      messagesKilled = 0;
+      messagesKilled.set(0);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f5a9322/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
index 6647c43..ac09745 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
@@ -20,6 +20,7 @@ import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
 import javax.management.ObjectName;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -104,6 +105,14 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
       return queueControl;
    }
 
+   protected QueueControl createManagementControl(final SimpleString address,
+                                                  final SimpleString queue,
+                                                  final RoutingType routingType) throws Exception {
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, routingType, mbeanServer);
+
+      return queueControl;
+   }
+
    protected long getMessageCount(QueueControl control) throws Exception {
       control.flushExecutor();
       return control.getMessageCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f5a9322/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 267549f..404f466 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -24,11 +24,14 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -48,7 +51,6 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManage
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
 import org.apache.activemq.artemis.utils.Base64;
@@ -606,6 +608,81 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testMessagesAddedAndMessagesAcknowledged() throws Exception {
+      final int THREAD_COUNT = 5;
+      final int MSG_COUNT = 1000;
+
+      CountDownLatch producerCountDown = new CountDownLatch(THREAD_COUNT);
+      CountDownLatch consumerCountDown = new CountDownLatch(THREAD_COUNT);
+
+      ExecutorService producerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
+      ExecutorService consumerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      try {
+         session.createQueue(address, RoutingType.ANYCAST, queue, null, false);
+
+         for (int i = 0; i < THREAD_COUNT; i++) {
+            producerExecutor.submit(() -> {
+               try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientProducer producer = session.createProducer(address)) {
+                  for (int j = 0; j < MSG_COUNT; j++) {
+                     producer.send(session.createMessage(false));
+                     Thread.sleep(5);
+                  }
+                  producerCountDown.countDown();
+               } catch (Exception e) {
+                  e.printStackTrace();
+               }
+            });
+         }
+
+         for (int i = 0; i < THREAD_COUNT; i++) {
+            consumerExecutor.submit(() -> {
+               try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientConsumer consumer = session.createConsumer(queue)) {
+                  session.start();
+                  for (int j = 0; j < MSG_COUNT; j++) {
+                     ClientMessage message = consumer.receive(500);
+                     Assert.assertNotNull(message);
+                     message.acknowledge();
+                  }
+                  session.commit();
+                  consumerCountDown.countDown();
+               } catch (Exception e) {
+                  e.printStackTrace();
+               }
+            });
+         }
+
+         producerCountDown.await(30, TimeUnit.SECONDS);
+         consumerCountDown.await(30, TimeUnit.SECONDS);
+
+         QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
+         Assert.assertEquals(0, queueControl.getMessageCount());
+         Assert.assertEquals(0, queueControl.getConsumerCount());
+         Assert.assertEquals(0, queueControl.getDeliveringCount());
+         Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAdded());
+         Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAcknowledged());
+
+         session.deleteQueue(queue);
+      } finally {
+         shutdownExecutor(producerExecutor);
+         shutdownExecutor(consumerExecutor);
+      }
+   }
+
+   private void shutdownExecutor(ExecutorService executor) {
+      try {
+         executor.shutdown();
+         executor.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+      } finally {
+         executor.shutdownNow();
+      }
+   }
+
+   @Test
    public void testListMessagesAsJSONWithNullFilter() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
@@ -2153,14 +2230,6 @@ public class QueueControlTest extends ManagementTestBase {
       session.start();
    }
 
-   @Override
-   protected QueueControl createManagementControl(final SimpleString address,
-                                                  final SimpleString queue) throws Exception {
-      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, mbeanServer);
-
-      return queueControl;
-   }
-
    protected long getFirstMessageId(final QueueControl queueControl) throws Exception {
       JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
       JsonObject object = (JsonObject) array.get(0);