You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2020/03/07 21:16:07 UTC

[activemq-artemis] 01/03: fix for #ARTEMIS-2476

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit e608c9af2cda8e3ebd92e220223c17906c6efc02
Author: Assen Sharlandjiev <as...@gmail.com>
AuthorDate: Mon Sep 16 10:38:20 2019 +0300

    fix for #ARTEMIS-2476
---
 .../protocol/mqtt/MQTTRetainMessageManager.java    | 36 ++++++++++------------
 1 file changed, 17 insertions(+), 19 deletions(-)

diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index a7381ea..841c7c8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -52,19 +52,12 @@ public class MQTTRetainMessageManager {
          queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
       }
 
-      try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
-         synchronized (queue) {
-            if (iterator.hasNext()) {
-               MessageReference ref = iterator.next();
-               iterator.remove();
-               queue.acknowledge(tx, ref);
-            }
+      queue.deleteAllReferences();
 
-            if (!reset) {
-               sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
-            }
-         }
+      if (!reset) {
+         sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
       }
+
    }
 
    // SEND to Queue.
@@ -76,18 +69,23 @@ public class MQTTRetainMessageManager {
       // Iterate over all matching retain queues and add the queue
       Transaction tx = session.getServerSession().newTransaction();
       try {
-         synchronized (queue) {
-            for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
-               Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
-               try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
-                  if (i.hasNext()) {
-                     Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
-                     sendToQueue(message, queue, tx);
+         for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
+            Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
+            try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
+               if (i.hasNext()) {
+                  MessageReference ref = i.next();
+                  while (i.hasNext()) {
+                     ref = i.next();
+                     if (i.hasNext()) {
+                        i.remove();
+                     }
                   }
+                  Message message = ref.getMessage().copy(session.getServer().getStorageManager().generateID());
+                  sendToQueue(message, queue, tx);
                }
             }
          }
-      } catch (Throwable t) {
+      } catch (Exception t) {
          tx.rollback();
          throw t;
       }