You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/29 14:40:42 UTC
[activemq-artemis] branch master updated: ARTEMIS-2856 consumer can
be created on closed session
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new cd7f52d ARTEMIS-2856 consumer can be created on closed session
new 6070074 This closes #3226
cd7f52d is described below
commit cd7f52d4ba12a03cbac9f4f2d7a34c5cfe023eba
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Tue Jul 28 16:15:22 2020 -0500
ARTEMIS-2856 consumer can be created on closed session
Due to a lack of concurrency protections it's possible to create a
consumer on a closed session. I've not been able to reproduce this with
a test, but I've seen it in the wild. Static code analysis points to a
need for better concurrency controls around closing the session and
creating consumers.
---
.../activemq/artemis/core/server/ActiveMQMessageBundle.java | 3 +++
.../activemq/artemis/core/server/impl/ServerSessionImpl.java | 12 +++++++++---
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index d96eb98..5da0521 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -492,4 +492,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229231, value = "Divert Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQDivertDoesNotExistException divertDoesNotExist(String divert);
+
+ @Message(id = 229232, value = "Cannot create consumer on {0}. Session is closed.", format = Message.Format.MESSAGE_FORMAT)
+ ActiveMQIllegalStateException cannotCreateConsumerOnClosedSession(SimpleString queueName);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d2ea24a..ff9082d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -426,6 +426,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
}
+ closed = true;
}
//putting closing of consumers outside the sync block
@@ -463,7 +464,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
callback.closed();
}
- closed = true;
//When the ServerSessionImpl is closed, need to create and send a SESSION_CLOSED notification.
sendSessionNotification(CoreNotificationType.SESSION_CLOSED);
@@ -559,8 +559,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
filterString, browseOnly, supportLargeMessage));
}
- ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
- consumers.put(consumer.getID(), consumer);
+ ServerConsumer consumer;
+ synchronized (this) {
+ if (closed) {
+ throw ActiveMQMessageBundle.BUNDLE.cannotCreateConsumerOnClosedSession(queueName);
+ }
+ consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
+ consumers.put(consumer.getID(), consumer);
+ }
if (server.hasBrokerConsumerPlugins()) {
server.callBrokerConsumerPlugins(plugin -> plugin.afterCreateConsumer(consumer));