You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2020/02/09 07:33:19 UTC
[activemq] branch master updated: AMQ-7376 - Use correct type for
collections retrieval
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 675fb7b AMQ-7376 - Use correct type for collections retrieval
new 6417d62 Merge pull request #428 from coheigea/AMQ-7376
675fb7b is described below
commit 675fb7bcae3a84c9fcbb1e35506fd688066fc59e
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Mon Jan 20 18:35:01 2020 +0000
AMQ-7376 - Use correct type for collections retrieval
---
.../org/apache/activemq/broker/region/Queue.java | 40 ++++++----------------
.../apache/activemq/store/kahadb/KahaDBStore.java | 2 +-
.../transport/mqtt/MQTTProtocolConverter.java | 3 +-
3 files changed, 12 insertions(+), 33 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 86b8c6d..a183b67 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -417,25 +417,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
- /*
- * Holder for subscription that needs attention on next iterate browser
- * needs access to existing messages in the queue that have already been
- * dispatched
- */
- class BrowserDispatch {
- QueueBrowserSubscription browser;
-
- public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
- browser = browserSubscription;
- browser.incrementQueueRef();
- }
-
- public QueueBrowserSubscription getBrowser() {
- return browser;
- }
- }
-
- ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
+ ConcurrentLinkedQueue<QueueBrowserSubscription> browserSubscriptions = new ConcurrentLinkedQueue<>();
@Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
@@ -484,8 +466,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (sub instanceof QueueBrowserSubscription) {
// tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
- BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
- browserDispatches.add(browserDispatch);
+ browserSubscription.incrementQueueRef();
+ browserSubscriptions.add(browserSubscription);
}
if (!this.optimizedDispatch) {
@@ -598,7 +580,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty());
if (sub instanceof QueueBrowserSubscription) {
((QueueBrowserSubscription)sub).decrementQueueRef();
- browserDispatches.remove(sub);
+ browserSubscriptions.remove(sub);
}
// AMQ-5107: don't resend if the broker is shutting down
if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) {
@@ -1723,7 +1705,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.readLock().unlock();
}
- boolean hasBrowsers = !browserDispatches.isEmpty();
+ boolean hasBrowsers = !browserSubscriptions.isEmpty();
if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
try {
@@ -1743,15 +1725,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInMessagesLock.readLock().unlock();
}
- Iterator<BrowserDispatch> browsers = browserDispatches.iterator();
+ Iterator<QueueBrowserSubscription> browsers = browserSubscriptions.iterator();
while (browsers.hasNext()) {
- BrowserDispatch browserDispatch = browsers.next();
+ QueueBrowserSubscription browser = browsers.next();
try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
- QueueBrowserSubscription browser = browserDispatch.getBrowser();
-
LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
boolean added = false;
for (MessageReference node : messagesInMemory) {
@@ -1766,12 +1746,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// are we done browsing? no new messages paged
if (!added || browser.atMax()) {
browser.decrementQueueRef();
- browserDispatches.remove(browserDispatch);
+ browsers.remove();
} else {
wakeup();
}
} catch (Exception e) {
- LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e);
+ LOG.warn("exception on dispatch to browser: {}", browser, e);
}
}
}
@@ -2136,7 +2116,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
private final boolean haveRealConsumer() {
- return consumers.size() - browserDispatches.size() > 0;
+ return consumers.size() - browserSubscriptions.size() > 0;
}
private void doDispatch(PendingList list) throws Exception {
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index a8af5ae..0d7feba 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -404,7 +404,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
}
private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException {
- ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination));
+ ProxyMessageStore store = (ProxyMessageStore) storeCache.get(key(convert(activeMQDestination)));
if (store == null) {
if (activeMQDestination.isQueue()) {
store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination);
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 51475f9..5cd0474 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
-import javax.jms.Destination;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -558,7 +557,7 @@ public class MQTTProtocolConverter {
ActiveMQDestination destination;
synchronized (activeMQDestinationMap) {
- destination = activeMQDestinationMap.get(command.topicName());
+ destination = activeMQDestinationMap.get(command.topicName().toString());
if (destination == null) {
String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
try {