You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/31 12:48:57 UTC
[activemq-artemis] 03/04: ARTEMIS-2844 Improve binding query
performance by reusing AddressImpl instances
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 60e25b763ca6f9ee81cc85e459c7eb757c05f2bf
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Thu Jul 9 19:56:25 2020 +0200
ARTEMIS-2844 Improve binding query performance by reusing AddressImpl instances
---
.../artemis/utils/collections/IterableStream.java | 31 +++++++++++
.../management/impl/ActiveMQServerControlImpl.java | 2 +-
.../artemis/core/postoffice/AddressManager.java | 11 ++--
.../artemis/core/postoffice/PostOffice.java | 11 ++--
.../artemis/core/postoffice/impl/AddressImpl.java | 11 +++-
.../core/postoffice/impl/PostOfficeImpl.java | 43 +++++++--------
.../core/postoffice/impl/SimpleAddressManager.java | 54 +++++++++----------
.../federation/address/FederatedAddress.java | 4 +-
.../server/federation/queue/FederatedQueue.java | 2 -
.../core/server/impl/ActiveMQServerImpl.java | 26 +++++----
.../artemis/core/server/impl/QueueImpl.java | 63 ++++++++++------------
.../broker/artemiswrapper/RegionProxy.java | 20 +++----
.../largemessages/AMQPLargeMessagesTestUtil.java | 9 ++--
.../integration/mqtt/imported/MQTTFQQNTest.java | 11 ++--
.../tests/integration/mqtt/imported/MQTTTest.java | 8 +--
.../metrics/JournalPendingMessageTest.java | 12 ++---
.../tests/integration/server/ScaleDownTest.java | 10 ++--
.../artemis/tests/integration/stomp/StompTest.java | 8 +--
.../impl/WildcardAddressManagerUnitTest.java | 20 +++----
.../core/server/impl/fakes/FakePostOffice.java | 11 ++--
20 files changed, 191 insertions(+), 176 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IterableStream.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IterableStream.java
new file mode 100644
index 0000000..412a770
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IterableStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.stream.Stream;
+
+public final class IterableStream {
+
+ private IterableStream() {
+
+ }
+
+ public static <T> Iterable<T> iterableOf(Stream<T> stream) {
+ return stream::iterator;
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 8b608ac..9084beb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2184,7 +2184,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address)).getBindings()) {
+ for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
for (Consumer consumer : queue.getConsumers()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index c8c0428..fb7f55e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -16,12 +16,13 @@
*/
package org.apache.activemq.artemis.core.postoffice;
+import java.util.Collection;
import java.util.EnumSet;
-import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -45,9 +46,9 @@ public interface AddressManager {
Bindings getBindingsForRoutingAddress(SimpleString address) throws Exception;
- Bindings getMatchingBindings(SimpleString address) throws Exception;
+ Collection<Binding> getMatchingBindings(SimpleString address) throws Exception;
- Bindings getDirectBindings(SimpleString address) throws Exception;
+ Collection<Binding> getDirectBindings(SimpleString address) throws Exception;
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
@@ -57,7 +58,7 @@ public interface AddressManager {
Binding getBinding(SimpleString queueName);
- Map<SimpleString, Binding> getBindings();
+ Stream<Binding> getBindings();
Set<SimpleString> getAddresses();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index b54448f..a03f7e3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -16,21 +16,22 @@
*/
package org.apache.activemq.artemis.core.postoffice;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -135,11 +136,11 @@ public interface PostOffice extends ActiveMQComponent {
Binding getBinding(SimpleString uniqueName);
- Bindings getMatchingBindings(SimpleString address) throws Exception;
+ Collection<Binding> getMatchingBindings(SimpleString address) throws Exception;
- Bindings getDirectBindings(SimpleString address) throws Exception;
+ Collection<Binding> getDirectBindings(SimpleString address) throws Exception;
- Map<SimpleString, Binding> getAllBindings();
+ Stream<Binding> getAllBindings();
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java
index aef2c10..12ef067 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -36,7 +37,7 @@ public class AddressImpl implements Address {
private final boolean containsWildCard;
- private final List<Address> linkedAddresses = new ArrayList<>();
+ private List<Address> linkedAddresses = null;
private final WildcardConfiguration wildcardConfiguration;
@@ -68,11 +69,14 @@ public class AddressImpl implements Address {
@Override
public List<Address> getLinkedAddresses() {
- return linkedAddresses;
+ return linkedAddresses == null ? Collections.emptyList() : linkedAddresses;
}
@Override
public void addLinkedAddress(final Address address) {
+ if (linkedAddresses == null) {
+ linkedAddresses = new ArrayList<>(1);
+ }
if (!linkedAddresses.contains(address)) {
linkedAddresses.add(address);
}
@@ -80,6 +84,9 @@ public class AddressImpl implements Address {
@Override
public void removeLinkedAddress(final Address actualAddress) {
+ if (linkedAddresses == null) {
+ return;
+ }
linkedAddresses.remove(actualAddress);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 04b5c9c..9e063fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -82,6 +82,7 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -97,6 +98,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
+import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/**
* This is the class that will make the routing to Queues and decide which consumer will get the messages
@@ -769,18 +773,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
server.callBrokerAddressPlugins(plugin -> plugin.beforeRemoveAddress(address));
}
- final Bindings bindingsForAddress = getDirectBindings(address);
+ final Collection<Binding> bindingsForAddress = getDirectBindings(address);
if (force) {
- for (Binding binding : bindingsForAddress.getBindings()) {
+ for (Binding binding : bindingsForAddress) {
if (binding instanceof QueueBinding) {
((QueueBinding)binding).getQueue().deleteQueue(true);
}
}
- } else {
- if (bindingsForAddress.getBindings().size() > 0) {
- throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
- }
+ } else if (!bindingsForAddress.isEmpty()) {
+ throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
}
managementService.unregisterAddress(address);
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
@@ -969,17 +971,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
- public Bindings getMatchingBindings(final SimpleString address) throws Exception {
+ public Collection<Binding> getMatchingBindings(final SimpleString address) throws Exception {
return addressManager.getMatchingBindings(address);
}
@Override
- public Bindings getDirectBindings(final SimpleString address) throws Exception {
+ public Collection<Binding> getDirectBindings(final SimpleString address) throws Exception {
return addressManager.getDirectBindings(address);
}
@Override
- public Map<SimpleString, Binding> getAllBindings() {
+ public Stream<Binding> getAllBindings() {
return addressManager.getBindings();
}
@@ -1731,7 +1733,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
- for (Queue queue : getLocalQueues()) {
+ for (Queue queue : iterableOf(getLocalQueues())) {
try {
queue.expireReferences();
} catch (Exception e) {
@@ -1753,11 +1755,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public void run() {
- for (Queue queue : getLocalQueues()) {
+ getLocalQueues().forEach(queue -> {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) {
QueueManagerImpl.performAutoDeleteQueue(server, queue);
}
- }
+ });
Set<SimpleString> addresses = addressManager.getAddresses();
@@ -1796,19 +1798,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
- private List<Queue> getLocalQueues() {
- Map<SimpleString, Binding> nameMap = addressManager.getBindings();
-
- List<Queue> queues = new ArrayList<>();
-
- for (Binding binding : nameMap.values()) {
- if (binding.getType() == BindingType.LOCAL_QUEUE) {
- Queue queue = (Queue) binding.getBindable();
-
- queues.add(queue);
- }
- }
- return queues;
+ private Stream<Queue> getLocalQueues() {
+ return addressManager.getBindings()
+ .filter(binding -> binding.getType() == BindingType.LOCAL_QUEUE)
+ .map(binding -> (Queue) binding.getBindable());
}
public static final class AddOperation implements TransactionOperation {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 35d3e69..965cd6f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -16,15 +16,18 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -35,7 +38,6 @@ import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
@@ -59,10 +61,7 @@ public class SimpleAddressManager implements AddressManager {
*/
protected final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<>();
- /**
- * {@code HashMap<QueueName, Binding>}
- */
- private final ConcurrentMap<SimpleString, Binding> nameMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<SimpleString, Pair<Binding, Address>> nameMap = new ConcurrentHashMap<>();
private final BindingsFactory bindingsFactory;
@@ -87,7 +86,8 @@ public class SimpleAddressManager implements AddressManager {
@Override
public boolean addBinding(final Binding binding) throws Exception {
- if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null) {
+ final Pair<Binding, Address> bindingAddressPair = new Pair<>(binding, new AddressImpl(binding.getAddress(), wildcardConfiguration));
+ if (nameMap.putIfAbsent(binding.getUniqueName(), bindingAddressPair) != null) {
throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(binding);
}
@@ -100,15 +100,15 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception {
- final Binding binding = nameMap.remove(uniqueName);
+ final Pair<Binding, Address> binding = nameMap.remove(uniqueName);
if (binding == null) {
return null;
}
- removeBindingInternal(binding.getAddress(), uniqueName);
+ removeBindingInternal(binding.getA().getAddress(), uniqueName);
- return binding;
+ return binding.getA();
}
@Override
@@ -118,42 +118,40 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Binding getBinding(final SimpleString bindableName) {
- return nameMap.get(CompositeAddress.extractQueueName(bindableName));
+ final Pair<Binding, Address> bindingAddressPair = nameMap.get(CompositeAddress.extractQueueName(bindableName));
+ return bindingAddressPair == null ? null : bindingAddressPair.getA();
}
@Override
- public Map<SimpleString, Binding> getBindings() {
- return nameMap;
+ public Stream<Binding> getBindings() {
+ return nameMap.values().stream().map(pair -> pair.getA());
}
@Override
- public Bindings getMatchingBindings(final SimpleString address) throws Exception {
+ public Collection<Binding> getMatchingBindings(final SimpleString address) throws Exception {
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Address add = new AddressImpl(realAddress, wildcardConfiguration);
- Bindings bindings = bindingsFactory.createBindings(realAddress);
-
- for (Binding binding : nameMap.values()) {
- Address addCheck = new AddressImpl(binding.getAddress(), wildcardConfiguration);
-
+ Collection<Binding> bindings = new ArrayList<>();
+ nameMap.forEach((bindingUniqueName, bindingAddressPair) -> {
+ final Address addCheck = bindingAddressPair.getB();
if (addCheck.matches(add)) {
- bindings.addBinding(binding);
+ bindings.add(bindingAddressPair.getA());
}
- }
-
+ });
return bindings;
}
@Override
- public Bindings getDirectBindings(final SimpleString address) throws Exception {
+ public Collection<Binding> getDirectBindings(final SimpleString address) throws Exception {
SimpleString realAddress = CompositeAddress.extractAddressName(address);
- Bindings bindings = bindingsFactory.createBindings(realAddress);
+ Collection<Binding> bindings = new ArrayList<>();
- for (Binding binding : nameMap.values()) {
- if (binding.getAddress().equals(realAddress)) {
- bindings.addBinding(binding);
+ nameMap.forEach((bindingUniqueName, bindingAddressPair) -> {
+ if (bindingAddressPair.getA().getAddress().equals(realAddress)) {
+ bindings.add(bindingAddressPair.getA());
}
- }
+ });
return bindings;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
index 819fa3d..a7218ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
@@ -111,8 +111,6 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
super.start();
server.getPostOffice()
.getAllBindings()
- .values()
- .stream()
.filter(b -> b instanceof QueueBinding || b instanceof DivertBinding)
.forEach(this::afterAddBinding);
}
@@ -144,7 +142,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
//if a new address is added we need to see if there are matching divert bindings
server.getPostOffice()
.getDirectBindings(addressInfo.getName())
- .getBindings().stream().filter(binding -> binding instanceof DivertBinding)
+ .stream().filter(binding -> binding instanceof DivertBinding)
.forEach(this::afterAddBinding);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, addressInfo.getName());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java
index 7953625..33df3b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java
@@ -94,8 +94,6 @@ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerC
super.start();
server.getPostOffice()
.getAllBindings()
- .values()
- .stream()
.filter(b -> b instanceof QueueBinding)
.map(b -> (QueueBinding) b)
.forEach(b -> conditionalCreateRemoteConsumer(b.getQueue()));
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 308eb0c..25685e3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -27,6 +27,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@@ -100,7 +101,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -204,6 +204,8 @@ import org.apache.activemq.artemis.utils.critical.CriticalComponent;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
+import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
+
/**
* The ActiveMQ Artemis server implementation
*/
@@ -910,19 +912,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
int defaultConsumersBeforeDispatch = addressSettings.getDefaultConsumersBeforeDispatch();
long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch();
- List<SimpleString> names = new ArrayList<>();
-
// make an exception for the management address (see HORNETQ-29)
ManagementService managementService = getManagementService();
if (managementService != null) {
if (realAddress.equals(managementService.getManagementAddress())) {
- return new BindingQueryResult(true, null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch);
+ return new BindingQueryResult(true, null, Collections.emptyList(), autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch);
}
}
- Bindings bindings = getPostOffice().getMatchingBindings(realAddress);
+ List<SimpleString> names = new ArrayList<>();
- for (Binding binding : bindings.getBindings()) {
+ for (Binding binding : getPostOffice().getMatchingBindings(realAddress)) {
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
SimpleString name;
if (!newFQQN && CompositeAddress.isFullyQualified(address.toString())) {
@@ -1594,11 +1594,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
public int getQueueCountForUser(String username) throws Exception {
- Map<SimpleString, Binding> bindings = postOffice.getAllBindings();
-
int queuesForUser = 0;
- for (Binding binding : bindings.values()) {
+ for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding instanceof LocalQueueBinding && ((LocalQueueBinding) binding).getQueue().getUser().equals(SimpleString.toSimpleString(username))) {
queuesForUser++;
}
@@ -1699,7 +1697,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalMessageCount() {
long total = 0;
- for (Binding binding : postOffice.getAllBindings().values()) {
+ for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getMessageCount();
}
@@ -1712,7 +1710,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalMessagesAdded() {
long total = 0;
- for (Binding binding : postOffice.getAllBindings().values()) {
+ for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getMessagesAdded();
}
@@ -1725,7 +1723,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalMessagesAcknowledged() {
long total = 0;
- for (Binding binding : postOffice.getAllBindings().values()) {
+ for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getMessagesAcknowledged();
}
@@ -1738,7 +1736,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalConsumerCount() {
long total = 0;
- for (Binding binding : postOffice.getAllBindings().values()) {
+ for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getConsumerCount();
}
@@ -4023,7 +4021,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
addressSettingsRepository.swap(configuration.getAddressesSettings().entrySet());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
- final Set<SimpleString> divertsToRemove = postOffice.getAllBindings().values().stream()
+ final Set<SimpleString> divertsToRemove = postOffice.getAllBindings()
.filter(binding -> binding instanceof DivertBinding)
.map(Binding::getUniqueName)
.collect(Collectors.toSet());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7e88817..9105fc2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -114,6 +114,8 @@ import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
import org.jctools.queues.MpscUnboundedArrayQueue;
+import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
+
/**
* Implementation of a Queue
* <p>
@@ -3320,42 +3322,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
String targetNodeID = null;
Binding targetBinding = null;
- for (Map.Entry<SimpleString, Binding> entry : postOffice.getAllBindings().entrySet()) {
- Binding binding = entry.getValue();
-
- // we only care about the remote queue bindings
- if (binding instanceof RemoteQueueBinding) {
- RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) binding;
-
- // does this remote queue binding point to the same queue as the message?
- if (oldQueueID == remoteQueueBinding.getRemoteQueueID()) {
- // get the name of this queue so we can find the corresponding remote queue binding pointing to the scale down target node
- SimpleString oldQueueName = remoteQueueBinding.getRoutingName();
-
- // parse the queue name of the remote queue binding to determine the node ID
- String temp = remoteQueueBinding.getQueue().getName().toString();
+ // we only care about the remote queue bindings
+ for (RemoteQueueBinding remoteQueueBinding : iterableOf(postOffice.getAllBindings()
+ .filter(RemoteQueueBinding.class::isInstance)
+ .map(RemoteQueueBinding.class::cast))) {
+ // does this remote queue binding point to the same queue as the message?
+ if (oldQueueID == remoteQueueBinding.getRemoteQueueID()) {
+ // get the name of this queue so we can find the corresponding remote queue binding pointing to the scale down target node
+ SimpleString oldQueueName = remoteQueueBinding.getRoutingName();
+
+ // parse the queue name of the remote queue binding to determine the node ID
+ String temp = remoteQueueBinding.getQueue().getName().toString();
+ targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
+ logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID);
+
+ // now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding
+ // again, we only care about the remote queue bindings
+ for (RemoteQueueBinding innerRemoteQueueBinding : iterableOf(postOffice.getAllBindings()
+ .filter(RemoteQueueBinding.class::isInstance)
+ .map(RemoteQueueBinding.class::cast))) {
+ temp = innerRemoteQueueBinding.getQueue().getName().toString();
targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
- logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID);
-
- // now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding
- for (Map.Entry<SimpleString, Binding> entry2 : postOffice.getAllBindings().entrySet()) {
- binding = entry2.getValue();
-
- // again, we only care about the remote queue bindings
- if (binding instanceof RemoteQueueBinding) {
- remoteQueueBinding = (RemoteQueueBinding) binding;
- temp = remoteQueueBinding.getQueue().getName().toString();
- targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
- if (oldQueueName.equals(remoteQueueBinding.getRoutingName()) && targetNodeID.equals(queueSuffix.toString())) {
- targetBinding = remoteQueueBinding;
- if (logger.isDebugEnabled()) {
- logger.debug("Message now destined for " + remoteQueueBinding.getRoutingName() + " with ID: " + remoteQueueBinding.getRemoteQueueID() + " on address " + copyMessage.getAddress() + " on node " + targetNodeID);
- }
- break;
- } else {
- logger.debug("Failed to match: " + remoteQueueBinding);
- }
+ if (oldQueueName.equals(innerRemoteQueueBinding.getRoutingName()) && targetNodeID.equals(queueSuffix.toString())) {
+ targetBinding = innerRemoteQueueBinding;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message now destined for " + innerRemoteQueueBinding.getRoutingName() + " with ID: " + innerRemoteQueueBinding.getRemoteQueueID() + " on address " + copyMessage.getAddress() + " on node " + targetNodeID);
}
+ break;
+ } else {
+ logger.debug("Failed to match: " + innerRemoteQueueBinding);
}
}
}
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java
index 0b97ffc..259da32 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.broker.artemiswrapper;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -44,10 +48,6 @@ import org.apache.activemq.command.Response;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
public class RegionProxy implements Region {
private final ActiveMQServer server;
private final RoutingType routingType;
@@ -77,21 +77,21 @@ public class RegionProxy implements Region {
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() {
- return server.getPostOffice().getAllBindings().entrySet().stream()
- .filter(e -> e.getValue() instanceof QueueBinding)
+ return server.getPostOffice().getAllBindings()
+ .filter(QueueBinding.class::isInstance)
.filter(e -> {
- final SimpleString address = ((QueueBinding) e.getValue()).getQueue().getAddress();
+ final SimpleString address = ((QueueBinding) e).getQueue().getAddress();
return server.getAddressInfo(address).getRoutingType() == routingType;
}
)
.collect(Collectors.toMap(
e -> {
- final String uniqueName = e.getValue().getUniqueName().toString();
+ final String uniqueName = e.getUniqueName().toString();
return new ActiveMQQueue(uniqueName);
},
e -> {
- final Queue queue = ((QueueBinding) e.getValue()).getQueue();
- final String address = e.getValue().getAddress().toString();
+ final Queue queue = ((QueueBinding) e).getQueue();
+ final String address = e.getAddress().toString();
return new DestinationProxy(queue, address, server);
}));
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java
index 3795046..704cd44 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
import java.util.NoSuchElementException;
-import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -33,11 +32,9 @@ public class AMQPLargeMessagesTestUtil {
public static void validateAllTemporaryBuffers(ActiveMQServer server) {
- for (Binding binding : server.getPostOffice().getAllBindings().values()) {
- if (binding instanceof QueueBinding) {
- validateTemporaryBuffers(((QueueBinding)binding).getQueue());
- }
- }
+ server.getPostOffice().getAllBindings()
+ .filter(QueueBinding.class::isInstance)
+ .forEach(binding -> validateTemporaryBuffers(((QueueBinding) binding).getQueue()));
}
public static void validateTemporaryBuffers(Queue serverQueue) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
index 5324590..9133d5d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -39,9 +38,8 @@ public class MQTTFQQNTest extends MQTTTestSupport {
try {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
- Map<SimpleString, Binding> allBindings = server.getPostOffice().getAllBindings();
- assertEquals(1, allBindings.size());
- Binding b = allBindings.values().iterator().next();
+ assertEquals(1, server.getPostOffice().getAllBindings().count());
+ Binding b = server.getPostOffice().getAllBindings().iterator().next();
//check that query using bare queue name works as before
QueueQueryResult result = server.queueQuery(b.getUniqueName());
assertTrue(result.isExists());
@@ -132,9 +130,8 @@ public class MQTTFQQNTest extends MQTTTestSupport {
try {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
- Map<SimpleString, Binding> allBindings = server.getPostOffice().getAllBindings();
- assertEquals(1, allBindings.size());
- Binding b = allBindings.values().iterator().next();
+ assertEquals(1, server.getPostOffice().getAllBindings().count());
+ Binding b = server.getPostOffice().getAllBindings().iterator().next();
//check ::queue
QueueQueryResult result = server.queueQuery(new SimpleString("::" + b.getUniqueName()));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index e04c953..7187dbc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -69,6 +69,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
+
/**
* MQTT Test imported from ActiveMQ MQTT component.
*/
@@ -149,10 +151,8 @@ public class MQTTTest extends MQTTTestSupport {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
- for (Binding b : server.getPostOffice().getAllBindings().values()) {
- if (b instanceof QueueBinding) {
- Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
- }
+ for (Binding b : iterableOf(server.getPostOffice().getAllBindings().filter(QueueBinding.class::isInstance))) {
+ Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
subscriptionProvider.disconnect();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
index 885b428..db9be9e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
@@ -16,11 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence.metrics;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.ToLongFunction;
-
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
@@ -29,6 +24,10 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.ToLongFunction;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -493,8 +492,7 @@ public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport
protected List<Queue> getQueues(final String address) throws Exception {
final List<Queue> queues = new ArrayList<>();
- for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))
- .getBindings()) {
+ for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
LocalQueueBinding queueBinding = (LocalQueueBinding) binding;
queues.add(queueBinding.getQueue());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index 9f8c47f..c8a0661 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -46,15 +46,17 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
+
@RunWith(value = Parameterized.class)
public class ScaleDownTest extends ClusterTestBase {
@@ -243,13 +245,13 @@ public class ScaleDownTest extends ClusterTestBase {
// find and pause the sf queue so no messages actually move from node 0 to node 1
String sfQueueName = null;
- for (Map.Entry<SimpleString, Binding> entry : servers[0].getPostOffice().getAllBindings().entrySet()) {
- String temp = entry.getValue().getAddress().toString();
+ for (Binding binding : iterableOf(servers[0].getPostOffice().getAllBindings())) {
+ String temp = binding.getAddress().toString();
if (temp.startsWith(servers[1].getInternalNamingPrefix() + "sf.") && temp.endsWith(servers[1].getNodeID().toString())) {
// we found the sf queue for the other node
// need to pause the sfQueue here
- ((LocalQueueBinding) entry.getValue()).getQueue().pause();
+ ((LocalQueueBinding) binding).getQueue().pause();
sfQueueName = temp;
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 9a2786c..648f6d1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -72,6 +72,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
+
@RunWith(Parameterized.class)
public class StompTest extends StompTestBase {
@@ -2026,10 +2028,8 @@ public class StompTest extends StompTestBase {
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
- for (Binding b : server.getPostOffice().getAllBindings().values()) {
- if (b instanceof QueueBinding) {
- Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
- }
+ for (Binding b : iterableOf(server.getPostOffice().getAllBindings().filter(QueueBinding.class::isInstance))) {
+ Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
// Send MQTT Message
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
index ecd02f5..8375ae8 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
@@ -151,21 +151,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
- assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
- assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
+ assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).size());
+ assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).size());
//Remove the address
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.test"));
//should still have 1 address and binding
assertEquals(1, ad.getAddresses().size());
- assertEquals(1, ad.getBindings().size());
+ assertEquals(1, ad.getBindings().count());
ad.removeBinding(SimpleString.toSimpleString("one"), null);
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.>"));
assertEquals(0, ad.getAddresses().size());
- assertEquals(0, ad.getBindings().size());
+ assertEquals(0, ad.getBindings().count());
}
@Test
@@ -189,12 +189,12 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test1")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test2")).getBindings().size());
- assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
- assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
- assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).getBindings().size());
- assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).getBindings().size());
- assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).getBindings().size());
- assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).getBindings().size());
+ assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).size());
+ assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).size());
+ assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).size());
+ assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).size());
+ assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).size());
+ assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).size());
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 274c50d..0270614 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -16,14 +16,16 @@
*/
package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
@@ -37,7 +39,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@@ -177,7 +178,7 @@ public class FakePostOffice implements PostOffice {
}
@Override
- public Map<SimpleString, Binding> getAllBindings() {
+ public Stream<Binding> getAllBindings() {
return null;
}
@@ -193,13 +194,13 @@ public class FakePostOffice implements PostOffice {
}
@Override
- public Bindings getMatchingBindings(final SimpleString address) {
+ public Collection<Binding> getMatchingBindings(final SimpleString address) {
return null;
}
@Override
- public Bindings getDirectBindings(final SimpleString address) {
+ public Collection<Binding> getDirectBindings(final SimpleString address) {
return null;
}