You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/26 17:01:10 UTC
[activemq-artemis] branch master updated: ARTEMIS-2434 Don't lock
ServerConsumerImpl for long period of time
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 64ba930 ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time
new 549e167 This closes #2771
64ba930 is described below
commit 64ba930f43b8fa3a78730d875d7e354cb9631675
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Thu Jul 25 09:23:47 2019 +0200
ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time
---
.../core/server/impl/ServerConsumerImpl.java | 79 ++++++++++++----------
1 file changed, 45 insertions(+), 34 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index c709d4e..e19b1e5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -729,58 +730,68 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void setStarted(final boolean started) {
- synchronized (lock) {
- boolean locked = lockDelivery();
-
+ lockDelivery(locked -> {
// This is to make sure nothing would sneak to the client while started = false
// the client will stop the session and perform a rollback in certain cases.
// in case something sneaks to the client you could get to messaging delivering forever until
// you restart the server
- try {
- this.started = browseOnly || started;
- } finally {
- if (locked) {
- lockDelivery.writeLock().unlock();
- }
- }
- }
-
+ this.started = browseOnly || started;
+ });
// Outside the lock
if (started) {
promptDelivery();
}
}
- private boolean lockDelivery() {
- try {
- if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
- ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
- if (server != null) {
- server.threadDump();
+ private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
+ private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100);
+
+ private boolean lockDelivery(java.util.function.Consumer<Boolean> task) {
+ final long startWait = System.nanoTime();
+ long now;
+ while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) {
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
+ synchronized (lock) {
+ task.accept(false);
}
return false;
}
- return true;
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
- return false;
+ synchronized (lock) {
+ if (lockDelivery.writeLock().tryLock()) {
+ try {
+ task.accept(true);
+ } finally {
+ lockDelivery.writeLock().unlock();
+ }
+ return true;
+ }
+ }
+ //entering the lock can take some time: discount that time from the
+ //time before attempting to lock delivery
+ final long timeToLock = System.nanoTime() - now;
+ if (timeToLock < TRY_LOCK_NS) {
+ final long timeToWait = TRY_LOCK_NS - timeToLock;
+ LockSupport.parkNanos(timeToWait);
+ }
}
+ ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
+ if (server != null) {
+ server.threadDump();
+ }
+ synchronized (lock) {
+ task.accept(false);
+ }
+ return false;
}
@Override
public void setTransferring(final boolean transferring) {
- synchronized (lock) {
- // This is to make sure that the delivery process has finished any pending delivery
- // otherwise a message may sneak in on the client while we are trying to stop the consumer
- boolean locked = lockDelivery();
- try {
- this.transferring = transferring;
- } finally {
- if (locked) {
- lockDelivery.writeLock().unlock();
- }
- }
- }
+ lockDelivery(locked -> this.transferring = transferring);
// Outside the lock
if (transferring) {