You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/13 15:44:34 UTC

[activemq-artemis] branch master updated: ARTEMIS-2489 ring q fails w/concurrent producers

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new c56b8fb  ARTEMIS-2489 ring q fails w/concurrent producers
     new de0da64  This closes #2835
c56b8fb is described below

commit c56b8fb9d2bad24fd6be95dfea89e46e405ae7e2
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Sep 12 14:34:13 2019 -0500

    ARTEMIS-2489 ring q fails w/concurrent producers
---
 .../artemis/core/server/impl/QueueImpl.java        | 15 ++--
 .../tests/integration/server/RingQueueTest.java    | 81 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 7 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d18ecbf..2f6e93b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -928,7 +928,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       synchronized (this) {
          try {
             if (ringSize != -1) {
-               enforceRing(ref, scheduling);
+               enforceRing(ref, scheduling, true);
             }
 
             if (!ref.isAlreadyAcked()) {
@@ -1026,8 +1026,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void addTail(final MessageReference ref, final boolean direct) {
       enterCritical(CRITICAL_PATH_ADD_TAIL);
       try {
-         enforceRing();
-
          if (scheduleIfPossible(ref)) {
             return;
          }
@@ -2592,6 +2590,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       refAdded(ref);
       messageReferences.addTail(ref, getPriority(ref));
       pendingMetrics.incrementMetrics(ref);
+      enforceRing(false);
    }
 
    /**
@@ -4072,14 +4071,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private void enforceRing() {
+   private void enforceRing(boolean head) {
       if (ringSize != -1) { // better escaping & inlining when ring isn't being used
-         enforceRing(null, false);
+         enforceRing(null, false, head);
       }
    }
 
-   private void enforceRing(MessageReference refToAck, boolean scheduling) {
-      if (getMessageCountForRing() >= ringSize) {
+   private void enforceRing(MessageReference refToAck, boolean scheduling, boolean head) {
+      int adjustment = head ? 1 : 0;
+
+      if (getMessageCountForRing() + adjustment > ringSize) {
          refToAck = refToAck == null ? messageReferences.poll() : refToAck;
 
          if (refToAck != null) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
index 7343b6b..9b06a84 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
@@ -20,6 +20,8 @@ import javax.jms.Connection;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueAttributes;
@@ -330,6 +332,42 @@ public class RingQueueTest extends ActiveMQTestBase {
       Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
    }
 
+   @Test
+   public void testMultipleConcurrentProducers() throws Exception {
+      final long RING_SIZE = 25;
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+      clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setRingSize(RING_SIZE).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+      assertEquals(RING_SIZE, queue.getRingSize());
+      final int nThreads = 25;
+      final long numberOfMessages = RING_SIZE;
+
+      SomeProducer[] producers = new SomeProducer[nThreads];
+
+      try {
+         for (int i = 0; i < nThreads; i++) {
+            producers[i] = new SomeProducer(numberOfMessages, nThreads, address);
+         }
+
+         for (int i = 0; i < nThreads; i++) {
+            producers[i].start();
+         }
+
+         for (SomeProducer producer : producers) {
+            producer.join();
+            assertEquals(0, producer.errors.get());
+         }
+      } catch (Exception e) {
+         e.printStackTrace();
+         fail(e.getMessage());
+      }
+
+      Wait.assertTrue("message count should be " + RING_SIZE + " but it's actually " + queue.getMessageCount(), () -> queue.getMessageCount() == RING_SIZE, 2000, 100);
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -339,4 +377,47 @@ public class RingQueueTest extends ActiveMQTestBase {
       // start the server
       server.start();
    }
+
+   class SomeProducer extends Thread {
+
+      final ClientSessionFactory factory;
+      final ServerLocator locator;
+      final ClientSession prodSession;
+      public final AtomicInteger errors = new AtomicInteger(0);
+      final long numberOfMessages;
+      final int nThreads;
+      final SimpleString address;
+
+      SomeProducer(long numberOfMessages, int nThreads, SimpleString address) throws Exception {
+         locator = createNettyNonHALocator();
+         factory = locator.createSessionFactory();
+         prodSession = factory.createSession(true, false);
+         this.numberOfMessages = numberOfMessages;
+         this.nThreads = nThreads;
+         this.address = address;
+      }
+
+      @Override
+      public void run() {
+         try {
+            ClientProducer producer = prodSession.createProducer(address);
+            for (int i = 0; i < numberOfMessages; i++) {
+               ClientMessage message = prodSession.createMessage(true);
+               message.putIntProperty("prodNR", i % nThreads);
+               producer.send(message);
+            }
+
+         } catch (Throwable e) {
+            e.printStackTrace();
+            errors.incrementAndGet();
+         } finally {
+            try {
+               prodSession.close();
+               locator.close();
+            } catch (Throwable ignored) {
+               ignored.printStackTrace();
+            }
+         }
+      }
+   }
 }