You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/26 17:01:10 UTC

[activemq-artemis] branch master updated: ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 64ba930  ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time
     new 549e167  This closes #2771
64ba930 is described below

commit 64ba930f43b8fa3a78730d875d7e354cb9631675
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Thu Jul 25 09:23:47 2019 +0200

    ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time
---
 .../core/server/impl/ServerConsumerImpl.java       | 79 ++++++++++++----------
 1 file changed, 45 insertions(+), 34 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index c709d4e..e19b1e5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -729,58 +730,68 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    @Override
    public void setStarted(final boolean started) {
-      synchronized (lock) {
-         boolean locked = lockDelivery();
-
+      lockDelivery(locked -> {
          // This is to make sure nothing would sneak to the client while started = false
          // the client will stop the session and perform a rollback in certain cases.
          // in case something sneaks to the client you could get to messaging delivering forever until
          // you restart the server
-         try {
-            this.started = browseOnly || started;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
-      }
-
+         this.started = browseOnly || started;
+      });
       // Outside the lock
       if (started) {
          promptDelivery();
       }
    }
 
-   private boolean lockDelivery() {
-      try {
-         if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
-            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
-            if (server != null) {
-               server.threadDump();
+   private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
+   private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100);
+
+   private boolean lockDelivery(java.util.function.Consumer<Boolean> task) {
+      final long startWait = System.nanoTime();
+      long now;
+      while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) {
+         try {
+            if (Thread.currentThread().isInterrupted()) {
+               throw new InterruptedException();
+            }
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
+            synchronized (lock) {
+               task.accept(false);
             }
             return false;
          }
-         return true;
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
-         return false;
+         synchronized (lock) {
+            if (lockDelivery.writeLock().tryLock()) {
+               try {
+                  task.accept(true);
+               } finally {
+                  lockDelivery.writeLock().unlock();
+               }
+               return true;
+            }
+         }
+         //entering the lock can take some time: discount that time from the
+         //time before attempting to lock delivery
+         final long timeToLock = System.nanoTime() - now;
+         if (timeToLock < TRY_LOCK_NS) {
+            final long timeToWait = TRY_LOCK_NS - timeToLock;
+            LockSupport.parkNanos(timeToWait);
+         }
       }
+      ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
+      if (server != null) {
+         server.threadDump();
+      }
+      synchronized (lock) {
+         task.accept(false);
+      }
+      return false;
    }
 
    @Override
    public void setTransferring(final boolean transferring) {
-      synchronized (lock) {
-         // This is to make sure that the delivery process has finished any pending delivery
-         // otherwise a message may sneak in on the client while we are trying to stop the consumer
-         boolean locked = lockDelivery();
-         try {
-            this.transferring = transferring;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
-      }
+      lockDelivery(locked -> this.transferring = transferring);
 
       // Outside the lock
       if (transferring) {