You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2020/02/09 07:39:22 UTC

[activemq] branch activemq-5.15.x updated: AMQ-7376 - Use correct type for collections retrieval

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new ccbc77a  AMQ-7376 - Use correct type for collections retrieval
ccbc77a is described below

commit ccbc77aae13d16ed505befc7f0fd49337d8fe293
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Mon Jan 20 18:35:01 2020 +0000

    AMQ-7376 - Use correct type for collections retrieval
    
    (cherry picked from commit 675fb7bcae3a84c9fcbb1e35506fd688066fc59e)
---
 .../org/apache/activemq/broker/region/Queue.java   | 40 ++++++----------------
 .../transport/mqtt/MQTTProtocolConverter.java      |  3 +-
 2 files changed, 11 insertions(+), 32 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index d668c14..ecd5e89 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -404,25 +404,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         }
     }
 
-    /*
-     * Holder for subscription that needs attention on next iterate browser
-     * needs access to existing messages in the queue that have already been
-     * dispatched
-     */
-    class BrowserDispatch {
-        QueueBrowserSubscription browser;
-
-        public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
-            browser = browserSubscription;
-            browser.incrementQueueRef();
-        }
-
-        public QueueBrowserSubscription getBrowser() {
-            return browser;
-        }
-    }
-
-    ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
+    ConcurrentLinkedQueue<QueueBrowserSubscription> browserSubscriptions = new ConcurrentLinkedQueue<>();
 
     @Override
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
@@ -471,8 +453,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             if (sub instanceof QueueBrowserSubscription) {
                 // tee up for dispatch in next iterate
                 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
-                BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
-                browserDispatches.add(browserDispatch);
+                browserSubscription.incrementQueueRef();
+                browserSubscriptions.add(browserSubscription);
             }
 
             if (!this.optimizedDispatch) {
@@ -585,7 +567,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                 dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty());
                 if (sub instanceof QueueBrowserSubscription) {
                     ((QueueBrowserSubscription)sub).decrementQueueRef();
-                    browserDispatches.remove(sub);
+                    browserSubscriptions.remove(sub);
                 }
                 // AMQ-5107: don't resend if the broker is shutting down
                 if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) {
@@ -1664,7 +1646,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                 pagedInPendingDispatchLock.readLock().unlock();
             }
 
-            boolean hasBrowsers = !browserDispatches.isEmpty();
+            boolean hasBrowsers = !browserSubscriptions.isEmpty();
 
             if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
                 try {
@@ -1684,15 +1666,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                     pagedInMessagesLock.readLock().unlock();
                 }
 
-                Iterator<BrowserDispatch> browsers = browserDispatches.iterator();
+                Iterator<QueueBrowserSubscription> browsers = browserSubscriptions.iterator();
                 while (browsers.hasNext()) {
-                    BrowserDispatch browserDispatch = browsers.next();
+                    QueueBrowserSubscription browser = browsers.next();
                     try {
                         MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
                         msgContext.setDestination(destination);
 
-                        QueueBrowserSubscription browser = browserDispatch.getBrowser();
-
                         LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
                         boolean added = false;
                         for (MessageReference node : messagesInMemory) {
@@ -1707,12 +1687,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                         // are we done browsing? no new messages paged
                         if (!added || browser.atMax()) {
                             browser.decrementQueueRef();
-                            browserDispatches.remove(browserDispatch);
+                            browsers.remove();
                         } else {
                             wakeup();
                         }
                     } catch (Exception e) {
-                        LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e);
+                        LOG.warn("exception on dispatch to browser: {}", browser, e);
                     }
                 }
             }
@@ -2070,7 +2050,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
     }
 
     private final boolean haveRealConsumer() {
-        return consumers.size() - browserDispatches.size() > 0;
+        return consumers.size() - browserSubscriptions.size() > 0;
     }
 
     private void doDispatch(PendingList list) throws Exception {
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 23ca5fa..3b0713f 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
-import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -558,7 +557,7 @@ public class MQTTProtocolConverter {
 
         ActiveMQDestination destination;
         synchronized (activeMQDestinationMap) {
-            destination = activeMQDestinationMap.get(command.topicName());
+            destination = activeMQDestinationMap.get(command.topicName().toString());
             if (destination == null) {
                 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
                 try {