You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2020/03/07 21:16:07 UTC
[activemq-artemis] 01/03: fix for #ARTEMIS-2476
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit e608c9af2cda8e3ebd92e220223c17906c6efc02
Author: Assen Sharlandjiev <as...@gmail.com>
AuthorDate: Mon Sep 16 10:38:20 2019 +0300
fix for #ARTEMIS-2476
---
.../protocol/mqtt/MQTTRetainMessageManager.java | 36 ++++++++++------------
1 file changed, 17 insertions(+), 19 deletions(-)
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index a7381ea..841c7c8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -52,19 +52,12 @@ public class MQTTRetainMessageManager {
queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
}
- try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
- synchronized (queue) {
- if (iterator.hasNext()) {
- MessageReference ref = iterator.next();
- iterator.remove();
- queue.acknowledge(tx, ref);
- }
+ queue.deleteAllReferences();
- if (!reset) {
- sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
- }
- }
+ if (!reset) {
+ sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
}
+
}
// SEND to Queue.
@@ -76,18 +69,23 @@ public class MQTTRetainMessageManager {
// Iterate over all matching retain queues and add the queue
Transaction tx = session.getServerSession().newTransaction();
try {
- synchronized (queue) {
- for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
- Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
- try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
- if (i.hasNext()) {
- Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
- sendToQueue(message, queue, tx);
+ for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
+ Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
+ try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
+ if (i.hasNext()) {
+ MessageReference ref = i.next();
+ while (i.hasNext()) {
+ ref = i.next();
+ if (i.hasNext()) {
+ i.remove();
+ }
}
+ Message message = ref.getMessage().copy(session.getServer().getStorageManager().generateID());
+ sendToQueue(message, queue, tx);
}
}
}
- } catch (Throwable t) {
+ } catch (Exception t) {
tx.rollback();
throw t;
}