You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/27 20:26:33 UTC

[activemq-artemis] 02/03: ARTEMIS-2434 Improving Consumer/Queue Delivery Lock

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 28ea18ea790820257e279bd82d14aa857f37a80c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sat Jul 27 10:58:39 2019 -0400

    ARTEMIS-2434 Improving Consumer/Queue Delivery Lock
    
    This is a less invasive improvement then the one I proposed at PR #2772
    or commit 7507a9fd4b282523c2b2f3517ed788153a35df4c
---
 .../core/server/impl/ServerConsumerImpl.java       | 51 ++++++----------------
 .../tests/integration/jms/client/GroupingTest.java |  1 -
 2 files changed, 14 insertions(+), 38 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index c709d4e..568af35 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -27,8 +27,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -65,6 +63,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.FutureLatch;
+import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
@@ -107,12 +106,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private SlowConsumerDetectionListener slowConsumerListener;
 
-   /**
-    * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
-    * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
-    * otherwise a rollback may get message sneaking in
-    */
-   private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
+   private final ReusableLatch pendingDelivery = new ReusableLatch(0);
 
    private volatile AtomicInteger availableCredits = new AtomicInteger(0);
 
@@ -481,7 +475,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
          }
 
-         lockDelivery.readLock().lock();
+         pendingDelivery.countUp();
 
          return HandleStatus.HANDLED;
       }
@@ -510,7 +504,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
             deliverStandardMessage(reference, message);
          }
       } finally {
-         lockDelivery.readLock().unlock();
+         pendingDelivery.countDown();
          callback.afterDelivery();
          if (server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
@@ -730,30 +724,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {
-         boolean locked = lockDelivery();
-
-         // This is to make sure nothing would sneak to the client while started = false
-         // the client will stop the session and perform a rollback in certain cases.
-         // in case something sneaks to the client you could get to messaging delivering forever until
-         // you restart the server
-         try {
-            this.started = browseOnly || started;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
+         this.started = browseOnly || started;
       }
 
       // Outside the lock
       if (started) {
          promptDelivery();
+      } else {
+         flushDelivery();
       }
    }
 
-   private boolean lockDelivery() {
+   private boolean flushDelivery() {
       try {
-         if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
+         if (!pendingDelivery.await(30, TimeUnit.SECONDS)) {
             ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
             if (server != null) {
                server.threadDump();
@@ -770,16 +754,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    @Override
    public void setTransferring(final boolean transferring) {
       synchronized (lock) {
-         // This is to make sure that the delivery process has finished any pending delivery
-         // otherwise a message may sneak in on the client while we are trying to stop the consumer
-         boolean locked = lockDelivery();
-         try {
-            this.transferring = transferring;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
+         this.transferring = transferring;
       }
 
       // Outside the lock
@@ -801,6 +776,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       if (!transferring) {
          promptDelivery();
+      } else {
+         flushDelivery();
       }
    }
 
@@ -1275,7 +1252,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
 
       public boolean deliver() throws Exception {
-         lockDelivery.readLock().lock();
+         pendingDelivery.countUp();
          try {
             if (!started) {
                return false;
@@ -1392,7 +1369,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
             return true;
          } finally {
-            lockDelivery.readLock().unlock();
+            pendingDelivery.countDown();
          }
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index ba8cd95..fc32c44 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -271,7 +271,6 @@ public class GroupingTest extends JMSTestBase {
 
          assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
       }
-      Thread.sleep(2000);
       //session.rollback();
       //session.close();
       //consume all msgs from 2nd first consumer