You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/09/12 15:19:34 UTC
[3/3] qpid-broker-j git commit: QPID-7771: [Java Broker] Add support
for binding argument 'x-replacement-routing-key'
QPID-7771: [Java Broker] Add support for binding argument 'x-replacement-routing-key'
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1abc9359
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1abc9359
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1abc9359
Branch: refs/heads/master
Commit: 1abc93597c5b2ff934fa7febee459a732791e3fa
Parents: 6d7ac36
Author: Keith Wall <kw...@apache.org>
Authored: Tue Sep 5 16:17:29 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Sep 12 16:17:27 2017 +0100
----------------------------------------------------------------------
.../server/exchange/DirectExchangeImpl.java | 165 +++--
.../server/exchange/FanoutExchangeImpl.java | 269 +++----
...FilterManagerReplacementRoutingKeyTuple.java | 46 ++
.../qpid/server/exchange/HeadersBinding.java | 21 +-
.../server/exchange/HeadersExchangeImpl.java | 73 +-
.../qpid/server/exchange/TopicExchangeImpl.java | 193 +++--
.../exchange/topic/TopicExchangeResult.java | 132 ++--
.../qpid/server/exchange/topic/TopicParser.java | 48 +-
.../qpid/server/message/RoutingResult.java | 11 +
.../org/apache/qpid/server/model/Binding.java | 1 +
.../org/apache/qpid/server/model/Exchange.java | 3 +-
.../server/exchange/DirectExchangeTest.java | 218 +++++-
.../server/exchange/FanoutExchangeTest.java | 471 ++++++------
.../server/exchange/HeadersExchangeTest.java | 401 +++++-----
.../qpid/server/exchange/TopicExchangeTest.java | 731 ++++++++++---------
.../server/routing/ExchangeRoutingTest.java | 173 +++++
16 files changed, 1603 insertions(+), 1353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
index 6c4be90..7ce9c37 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.exchange;
+import static org.apache.qpid.server.model.Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY;
+
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -40,7 +40,6 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -51,45 +50,46 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
private final class BindingSet
{
- private final Set<MessageDestination> _unfilteredQueues;
- private final Map<MessageDestination, FilterManager> _filteredQueues;
+ private final Map<MessageDestination, String> _unfilteredDestinations;
+ private final Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> _filteredDestinations;
- public BindingSet()
+ BindingSet()
{
- _unfilteredQueues = Collections.emptySet();
- _filteredQueues = Collections.emptyMap();
+ _unfilteredDestinations = Collections.emptyMap();
+ _filteredDestinations = Collections.emptyMap();
}
- private BindingSet(final Set<MessageDestination> unfilteredQueues,
- final Map<MessageDestination, FilterManager> filteredQueues)
+ private BindingSet(final Map<MessageDestination, String> unfilteredDestinations,
+ final Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations)
{
- _unfilteredQueues = unfilteredQueues;
- _filteredQueues = filteredQueues;
+ _unfilteredDestinations = unfilteredDestinations;
+ _filteredDestinations = filteredDestinations;
}
- public Set<MessageDestination> getUnfilteredQueues()
+ Map<MessageDestination, String> getUnfilteredDestinations()
{
- return _unfilteredQueues;
+ return _unfilteredDestinations;
}
- public boolean hasFilteredQueues()
+ boolean hasFilteredQueues()
{
- return !_filteredQueues.isEmpty();
+ return !_filteredDestinations.isEmpty();
}
boolean isEmpty()
{
- return _unfilteredQueues.isEmpty() && _filteredQueues.isEmpty();
+ return _unfilteredDestinations.isEmpty() && _filteredDestinations.isEmpty();
}
- public Map<MessageDestination,FilterManager> getFilteredQueues()
+ Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> getFilteredDestinations()
{
- return _filteredQueues;
+ return _filteredDestinations;
}
BindingSet putBinding(MessageDestination destination, Map<String, Object> arguments, boolean force)
{
- if(!force && (_unfilteredQueues.contains(destination) || _filteredQueues.containsKey(destination)))
+ if (!force && (_unfilteredDestinations.containsKey(destination) || _filteredDestinations.containsKey(
+ destination)))
{
return this;
}
@@ -97,72 +97,81 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
{
try
{
- Set<MessageDestination> unfilteredQueues;
- Map<MessageDestination, FilterManager> filteredQueues;
- if (_unfilteredQueues.contains(destination))
+ Map<MessageDestination, String> unfilteredDestinations;
+ Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+ if (_unfilteredDestinations.containsKey(destination))
{
- unfilteredQueues = new HashSet<>(_unfilteredQueues);
- unfilteredQueues.remove(destination);
+ unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+ unfilteredDestinations.remove(destination);
}
else
{
- unfilteredQueues = _unfilteredQueues;
+ unfilteredDestinations = _unfilteredDestinations;
}
- filteredQueues = new HashMap<>(_filteredQueues);
- filteredQueues.put(destination,
- FilterSupport.createMessageFilter(arguments, (Queue<?>) destination));
+ filteredDestinations = new HashMap<>(_filteredDestinations);
+ FilterManager messageFilter = FilterSupport.createMessageFilter(arguments, destination);
+ String replacementRoutingKey = arguments.containsKey(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY)
+ ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+ : null;
+ filteredDestinations.put(destination,
+ new FilterManagerReplacementRoutingKeyTuple(messageFilter,
+ replacementRoutingKey));
- return new BindingSet(Collections.unmodifiableSet(unfilteredQueues), Collections.unmodifiableMap(filteredQueues));
+ return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations),
+ Collections.unmodifiableMap(filteredDestinations));
}
catch (AMQInvalidArgumentException e)
{
- _logger.warn("Binding ignored: cannot parse filter on binding of queue '" + destination.getName()
- + "' to exchange '" + DirectExchangeImpl.this.getName()
- + "' with arguments: " + arguments, e);
+ _logger.warn(
+ "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
+ destination.getName(),
+ DirectExchangeImpl.this.getName(),
+ arguments,
+ e);
return this;
}
}
else
{
- Set<MessageDestination> unfilteredQueues;
- Map<MessageDestination, FilterManager> filteredQueues;
- if (_filteredQueues.containsKey(destination))
+ Map<MessageDestination, String> unfilteredDestinations;
+ Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+ if (_filteredDestinations.containsKey(destination))
{
- filteredQueues = new HashMap<>(_filteredQueues);
- filteredQueues.remove(destination);
+ filteredDestinations = new HashMap<>(_filteredDestinations);
+ filteredDestinations.remove(destination);
}
else
{
- filteredQueues = _filteredQueues;
+ filteredDestinations = _filteredDestinations;
}
- unfilteredQueues = new HashSet<>(_unfilteredQueues);
- unfilteredQueues.add(destination);
-
- return new BindingSet(Collections.unmodifiableSet(unfilteredQueues), Collections.unmodifiableMap(filteredQueues));
+ unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+ Object replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY);
+ unfilteredDestinations.put(destination, replacementRoutingKey == null ? null : String.valueOf(replacementRoutingKey));
+ return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), Collections.unmodifiableMap(filteredDestinations));
}
}
- public BindingSet removeBinding(final MessageDestination destination)
+ BindingSet removeBinding(final MessageDestination destination)
{
- Set<MessageDestination> unfilteredQueues;
- Map<MessageDestination, FilterManager> filteredQueues;
- if (_unfilteredQueues.contains(destination))
+ Map<MessageDestination, String> unfilteredDestinations;
+ Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+ if (_unfilteredDestinations.containsKey(destination))
{
- unfilteredQueues = new HashSet<>(_unfilteredQueues);
- unfilteredQueues.remove(destination);
+ unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+ unfilteredDestinations.remove(destination);
- return new BindingSet(Collections.unmodifiableSet(unfilteredQueues),_filteredQueues);
+ return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), _filteredDestinations);
}
- else if(_filteredQueues.containsKey(destination))
+ else if(_filteredDestinations.containsKey(destination))
{
- filteredQueues = new HashMap<>(_filteredQueues);
- filteredQueues.remove(destination);
- return new BindingSet(_unfilteredQueues, Collections.unmodifiableMap(filteredQueues));
+ filteredDestinations = new HashMap<>(_filteredDestinations);
+ filteredDestinations.remove(destination);
+ return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
}
else
{
@@ -172,52 +181,54 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
}
}
- private final ConcurrentMap<String, BindingSet> _bindingsByKey =
- new ConcurrentHashMap<String, BindingSet>();
+ private final ConcurrentMap<String, BindingSet> _bindingsByKey = new ConcurrentHashMap<>();
@ManagedObjectFactoryConstructor
- public DirectExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
+ DirectExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
{
super(attributes, vhost);
}
@Override
- public <M extends ServerMessage<? extends StorableMessageMetaData>> void doRoute(final M payload,
- final String routingKey,
- final InstanceProperties instanceProperties,
- final RoutingResult<M> result)
+ public <M extends ServerMessage<? extends StorableMessageMetaData>> void doRoute(final M payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties,
+ final RoutingResult<M> result)
{
-
BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
-
- if(bindings != null)
+ if (bindings != null)
{
- final Set<MessageDestination> unfilteredQueues = bindings.getUnfilteredQueues();
- for(MessageDestination destination : unfilteredQueues)
+ final Map<MessageDestination, String> unfilteredDestinations = bindings.getUnfilteredDestinations();
+ for (MessageDestination destination : unfilteredDestinations.keySet())
{
- result.add(destination.route(payload, routingKey, instanceProperties));
+ String actualRoutingKey = unfilteredDestinations.get(destination) == null
+ ? routingKey
+ : unfilteredDestinations.get(destination);
+ result.add(destination.route(payload, actualRoutingKey, instanceProperties));
}
- if(bindings.hasFilteredQueues())
+ if (bindings.hasFilteredQueues())
{
Filterable filterable = Filterable.Factory.newInstance(payload, instanceProperties);
- Map<MessageDestination, FilterManager> filteredQueues = bindings.getFilteredQueues();
- for(Map.Entry<MessageDestination, FilterManager> entry : filteredQueues.entrySet())
+ Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations =
+ bindings.getFilteredDestinations();
+ for (Map.Entry<MessageDestination, FilterManagerReplacementRoutingKeyTuple> entry : filteredDestinations
+ .entrySet())
{
- if(!unfilteredQueues.contains(entry.getKey()))
+ FilterManagerReplacementRoutingKeyTuple tuple = entry.getValue();
+ String actualRoutingKey = tuple.getReplacementRoutingKey() == null
+ ? routingKey
+ : tuple.getReplacementRoutingKey();
+
+ if (tuple.getFilterManager().allAllow(filterable))
{
- FilterManager filter = entry.getValue();
- if(filter.allAllow(filterable))
- {
- result.add(entry.getKey().route(payload, routingKey, instanceProperties));
- }
+ result.add(entry.getKey().route(payload, actualRoutingKey, instanceProperties));
}
}
}
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
index d6864d4..683d28b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
+import static org.apache.qpid.server.model.Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY;
+
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +40,6 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -46,169 +47,122 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
{
private static final Logger _logger = LoggerFactory.getLogger(FanoutExchangeImpl.class);
- private static final Integer ONE = Integer.valueOf(1);
-
private final class BindingSet
{
- private final Map<MessageDestination,Integer> _queues;
-
- private final List<MessageDestination> _unfilteredQueues;
- private final List<MessageDestination> _filteredQueues;
+ private final Map<MessageDestination, Map<BindingIdentifier, String>> _unfilteredDestinations;
+ private final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+ _filteredDestinations;
- private final Map<MessageDestination,Map<BindingIdentifier, FilterManager>> _filteredBindings;
-
- public BindingSet(final Map<MessageDestination, Integer> queues,
- final List<MessageDestination> unfilteredQueues,
- final List<MessageDestination> filteredQueues,
- final Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings)
+ BindingSet(final Map<MessageDestination, Map<BindingIdentifier, String>> unfilteredDestinations,
+ final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>> filteredDestinations)
{
- _queues = queues;
- _unfilteredQueues = unfilteredQueues;
- _filteredQueues = filteredQueues;
- _filteredBindings = filteredBindings;
+ _unfilteredDestinations = unfilteredDestinations;
+ _filteredDestinations = filteredDestinations;
}
- public BindingSet()
+ BindingSet()
{
- _queues = Collections.emptyMap();
- _unfilteredQueues = Collections.emptyList();
- _filteredQueues = Collections.emptyList();
- _filteredBindings = Collections.emptyMap();
+ _unfilteredDestinations = Collections.emptyMap();
+ _filteredDestinations = Collections.emptyMap();
}
- public BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
+ BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
{
- if(FilterSupport.argumentsContainFilter(arguments))
+ MessageDestination destination = binding.getDestination();
+ if (FilterSupport.argumentsContainFilter(arguments))
+ {
+ try
{
- try
- {
- List<MessageDestination> filteredQueues;
- if (!(_filteredQueues.contains(binding.getDestination())
- || _unfilteredQueues.contains(binding.getDestination())))
- {
- filteredQueues = new ArrayList<>(_filteredQueues);
- filteredQueues.add(binding.getDestination());
- filteredQueues = Collections.unmodifiableList(filteredQueues);
- }
- else
- {
- filteredQueues = _filteredQueues;
- }
- Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings =
- new HashMap<>(_filteredBindings);
- Map<BindingIdentifier, FilterManager> bindingsForQueue =
- filteredBindings.get(binding.getDestination());
- if (bindingsForQueue == null)
- {
- bindingsForQueue = new HashMap<>();
- }
- else
- {
- bindingsForQueue = new HashMap<>(bindingsForQueue);
- }
- bindingsForQueue.put(binding,
- FilterSupport.createMessageFilter(arguments,
- binding.getDestination()));
- filteredBindings.put(binding.getDestination(), bindingsForQueue);
- return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
- }
- catch (AMQInvalidArgumentException e)
- {
- _logger.warn("Binding ignored: cannot parse filter on binding of queue '" + binding.getDestination().getName()
- + "' to exchange '" + FanoutExchangeImpl.this.getName()
- + "' with arguments: " + arguments, e);
- return this;
- }
+ Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+ filteredDestinations = new HashMap<>(_filteredDestinations);
+
+ filteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
+
+ Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination =
+ new HashMap<>(filteredDestinations.get(destination));
+
+ FilterManager filterManager = FilterSupport.createMessageFilter(arguments, destination);
+ String replacementRoutingKey = arguments.containsKey(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY)
+ ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+ : null;
+
+ bindingsForDestination.put(binding,
+ new FilterManagerReplacementRoutingKeyTuple(filterManager,
+ replacementRoutingKey));
+ filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
+ return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
}
- else
+ catch (AMQInvalidArgumentException e)
{
- Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
- List<MessageDestination> unfilteredQueues;
- List<MessageDestination> filteredQueues;
- if (queues.containsKey(binding.getDestination()))
- {
- queues.put(binding.getDestination(), queues.get(binding.getDestination()) + 1);
- unfilteredQueues = _unfilteredQueues;
- filteredQueues = _filteredQueues;
- }
- else
- {
- queues.put(binding.getDestination(), ONE);
- unfilteredQueues = new ArrayList<>(_unfilteredQueues);
- unfilteredQueues.add((Queue<?>)binding.getDestination());
- unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
- if(_filteredQueues.contains(binding.getDestination()))
- {
- filteredQueues = new ArrayList<>(_filteredQueues);
- filteredQueues.remove(binding.getDestination());
- filteredQueues = Collections.unmodifiableList(filteredQueues);
- }
- else
- {
- filteredQueues = _filteredQueues;
- }
- }
- return new BindingSet(queues, unfilteredQueues, filteredQueues, _filteredBindings);
+ _logger.warn(
+ "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
+ destination.getName(),
+ FanoutExchangeImpl.this.getName(),
+ arguments,
+ e);
+ return this;
+ }
+ }
+ else
+ {
+ Map<MessageDestination, Map<BindingIdentifier, String>> unfilteredDestinations =
+ new HashMap<>(_unfilteredDestinations);
+ unfilteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
+
+ String replacementRoutingKey = null;
+ if (arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null)
+ {
+ replacementRoutingKey = String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY));
}
+
+ Map<BindingIdentifier, String> replacementRoutingKeysForDestination =
+ new HashMap<>(unfilteredDestinations.get(destination));
+ replacementRoutingKeysForDestination.put(binding, replacementRoutingKey);
+
+ unfilteredDestinations.put(destination,
+ Collections.unmodifiableMap(replacementRoutingKeysForDestination));
+ return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), _filteredDestinations);
+ }
}
- public BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
+ BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
{
-
return removeBinding(binding).addBinding(binding, newArguments);
}
- public BindingSet removeBinding(final BindingIdentifier binding)
+ BindingSet removeBinding(final BindingIdentifier binding)
{
- Queue<?> queue = (Queue<?>) binding.getDestination();
- if(_filteredBindings.containsKey(queue) && _filteredBindings.get(queue).containsKey(binding))
+ MessageDestination destination = binding.getDestination();
+ if(_filteredDestinations.containsKey(destination) && _filteredDestinations.get(destination).containsKey(binding))
{
- final Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings = new HashMap<>(_filteredBindings);
- final Map<BindingIdentifier, FilterManager> bindingsForQueue = new HashMap<>(filteredBindings.remove(queue));
- bindingsForQueue.remove(binding);
- List<MessageDestination> filteredQueues;
- if(bindingsForQueue.isEmpty())
+ final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>> filteredDestinations = new HashMap<>(_filteredDestinations);
+ final Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination = new HashMap<>(filteredDestinations.get(destination));
+ bindingsForDestination.remove(binding);
+ if (bindingsForDestination.isEmpty())
{
- filteredQueues = new ArrayList<>(_filteredQueues);
- filteredQueues.remove(queue);
- filteredQueues = Collections.unmodifiableList(filteredQueues);
+ filteredDestinations.remove(destination);
}
else
{
- filteredBindings.put(queue, bindingsForQueue);
- filteredQueues = _filteredQueues;
+ filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
}
- return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
+ return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
}
- else if(_unfilteredQueues.contains(queue))
+ else if(_unfilteredDestinations.containsKey(destination) && _unfilteredDestinations.get(destination).containsKey(binding))
{
- Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
- int count = queues.remove(queue);
- List<MessageDestination> unfilteredQueues;
- List<MessageDestination> filteredQueues;
- if(count > 1)
+ Map<MessageDestination, Map<BindingIdentifier, String>> unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+ final Map<BindingIdentifier, String> bindingsForDestination = new HashMap<>(unfilteredDestinations.get(destination));
+ bindingsForDestination.remove(binding);
+ if (bindingsForDestination.isEmpty())
{
- queues.put(queue, --count);
- unfilteredQueues = _unfilteredQueues;
- filteredQueues = _filteredQueues;
+ unfilteredDestinations.remove(destination);
}
else
{
- unfilteredQueues = new ArrayList<>(_unfilteredQueues);
- unfilteredQueues.remove(queue);
- unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
- if(_filteredBindings.containsKey(queue))
- {
- filteredQueues = new ArrayList<>(_filteredQueues);
- filteredQueues.add(queue);
- filteredQueues = Collections.unmodifiableList(filteredQueues);
- }
- else
- {
- filteredQueues = _filteredQueues;
- }
+ unfilteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
}
- return new BindingSet(Collections.unmodifiableMap(queues), unfilteredQueues, filteredQueues, _filteredBindings);
+
+ return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), _filteredDestinations);
}
else
{
@@ -217,12 +171,8 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
}
}
-
private volatile BindingSet _bindingSet = new BindingSet();
- /**
- * Maps from queue name to queue instances
- */
@ManagedObjectFactoryConstructor
public FanoutExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
@@ -237,32 +187,47 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
final RoutingResult<M> result)
{
BindingSet bindingSet = _bindingSet;
- for(MessageDestination destination : bindingSet._unfilteredQueues)
+
+ if (!bindingSet._unfilteredDestinations.isEmpty())
{
- result.add(destination.route(message, routingAddress, instanceProperties));
+ for (MessageDestination destination : bindingSet._unfilteredDestinations.keySet())
+ {
+ Set<String> replacementRoutingKeys =
+ new HashSet<>(bindingSet._unfilteredDestinations.get(destination).values());
+
+ replacementRoutingKeys.forEach(
+ replacementRoutingKey -> result.add(destination.route(message,
+ replacementRoutingKey == null
+ ? routingAddress
+ : replacementRoutingKey,
+ instanceProperties)));
+ }
}
- final Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings = bindingSet._filteredBindings;
- if(!bindingSet._filteredQueues.isEmpty())
+
+ final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+ filteredDestinations = bindingSet._filteredDestinations;
+ if (!filteredDestinations.isEmpty())
{
- for(MessageDestination q : bindingSet._filteredQueues)
+ for (Map.Entry<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>> entry :
+ filteredDestinations.entrySet())
{
- final Map<BindingIdentifier, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
- if(!(bindingMessageFilterMap == null || bindingSet._unfilteredQueues.contains(q)))
+ MessageDestination destination = entry.getKey();
+ final Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingMessageFilterMap =
+ entry.getValue();
+ for (FilterManagerReplacementRoutingKeyTuple tuple : bindingMessageFilterMap.values())
{
- for(FilterManager filter : bindingMessageFilterMap.values())
+
+ FilterManager filter = tuple.getFilterManager();
+ if (filter.allAllow(Filterable.Factory.newInstance(message, instanceProperties)))
{
- if(filter.allAllow(Filterable.Factory.newInstance(message, instanceProperties)))
- {
- result.add(q.route(message, routingAddress, instanceProperties));
- break;
- }
+ String routingKey = tuple.getReplacementRoutingKey() == null
+ ? routingAddress
+ : tuple.getReplacementRoutingKey();
+ result.add(destination.route(message, routingKey, instanceProperties));
}
}
}
-
}
-
-
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java
new file mode 100644
index 0000000..3047755
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.filter.FilterManager;
+
+final class FilterManagerReplacementRoutingKeyTuple
+{
+ private final FilterManager _filterManager;
+ private final String _replacementRoutingKey; // Nullable
+
+ FilterManagerReplacementRoutingKeyTuple(final FilterManager filterManager,
+ final String replacementRoutingKey)
+ {
+ _filterManager = filterManager;
+ _replacementRoutingKey = replacementRoutingKey;
+ }
+
+ FilterManager getFilterManager()
+ {
+ return _filterManager;
+ }
+
+ String getReplacementRoutingKey()
+ {
+ return _replacementRoutingKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index aba2def..c14e03e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -35,7 +35,7 @@ import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Binding;
/**
* Defines binding and matching based on a set of headers.
@@ -46,8 +46,9 @@ class HeadersBinding
private final Map<String,Object> _mappings;
private final AbstractExchange.BindingIdentifier _binding;
- private final Set<String> required = new HashSet<String>();
- private final Map<String,Object> matches = new HashMap<String,Object>();
+ private final Set<String> required = new HashSet<>();
+ private final Map<String,Object> matches = new HashMap<>();
+ private final String _replacementRoutingKey;
private boolean matchAny;
private FilterManager _filter;
@@ -62,15 +63,18 @@ class HeadersBinding
public HeadersBinding(AbstractExchange.BindingIdentifier binding, Map<String,Object> arguments)
{
_binding = binding;
- if(_binding !=null)
+ arguments = arguments == null ? Collections.emptyMap() : arguments;
+ if(_binding != null)
{
- _mappings = arguments == null ? Collections.<String,Object>emptyMap() : arguments;
+ _mappings = arguments;
initMappings();
}
else
{
_mappings = null;
}
+ Object key = arguments.get(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY);
+ _replacementRoutingKey = key == null ? null : String.valueOf(key);
}
private void initMappings()
@@ -79,7 +83,7 @@ class HeadersBinding
{
try
{
- _filter = FilterSupport.createMessageFilter(_mappings, (Queue<?>) _binding.getDestination());
+ _filter = FilterSupport.createMessageFilter(_mappings, _binding.getDestination());
}
catch (AMQInvalidArgumentException e)
{
@@ -278,6 +282,11 @@ class HeadersBinding
return _binding == null ? 0 : _binding.hashCode();
}
+ public String getReplacementRoutingKey()
+ {
+ return _replacementRoutingKey;
+ }
+
private static class ExcludeAllFilter implements MessageFilter
{
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
index 6ebd1b9..65eabb3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
@@ -21,22 +21,19 @@
package org.apache.qpid.server.exchange;
import java.util.Collections;
-import java.util.ListIterator;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -72,11 +69,7 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
private static final Logger _logger = LoggerFactory.getLogger(HeadersExchangeImpl.class);
- private final ConcurrentMap<String, CopyOnWriteArraySet<BindingIdentifier>> _bindingsByKey =
- new ConcurrentHashMap<>();
-
- private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
- new CopyOnWriteArrayList<HeadersBinding>();
+ private final Set<HeadersBinding> _bindingHeaderMatchers = Collections.newSetFromMap(new ConcurrentHashMap<>());
@ManagedObjectFactoryConstructor
public HeadersExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
@@ -86,7 +79,7 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
@Override
public <M extends ServerMessage<? extends StorableMessageMetaData>> void doRoute(M payload,
- final String routingKey,
+ String routingKey,
final InstanceProperties instanceProperties,
RoutingResult<M> routingResult)
{
@@ -96,15 +89,17 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
{
if (hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
{
- BindingIdentifier b = hb.getBinding();
-
+ MessageDestination destination = hb.getBinding().getDestination();
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getName() + ": delivering message with headers " +
- payload.getMessageHeader() + " to " + b.getDestination().getName());
+ _logger.debug("Exchange '{}' delivering message with headers '{}' to '{}'",
+ getName(), payload.getMessageHeader(), destination.getName());
}
- routingResult.add(b.getDestination().route(payload, routingKey, instanceProperties));
+ String actualRoutingKey = hb.getReplacementRoutingKey() == null
+ ? routingKey
+ : hb.getReplacementRoutingKey();
+ routingResult.add(destination.route(payload, actualRoutingKey, instanceProperties));
}
}
}
@@ -113,61 +108,19 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
@Override
protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments)
{
- String bindingKey = binding.getBindingKey();
- Queue<?> queue = (Queue<?>) binding.getDestination();
-
- CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(bindingKey);
-
- if(bindings == null)
- {
- bindings = new CopyOnWriteArraySet<>();
- CopyOnWriteArraySet<BindingIdentifier> newBindings;
- if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
- {
- bindings = newBindings;
- }
- }
-
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() +
- " with binding key '" +bindingKey + "' and args: " + arguments);
- }
-
_bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
- bindings.add(binding);
-
}
@Override
protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments)
{
- HeadersBinding headersBinding = new HeadersBinding(binding, arguments);
- ListIterator<HeadersBinding> iter = _bindingHeaderMatchers.listIterator();
- while(iter.hasNext())
- {
- if(iter.next().equals(headersBinding))
- {
- iter.set(headersBinding);
- }
- }
-
+ _bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
}
@Override
protected void onUnbind(final BindingIdentifier binding)
{
- assert binding != null;
-
- CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(binding.getBindingKey());
- if(bindings != null)
- {
- bindings.remove(binding);
- }
-
- boolean removedBinding = _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.<String,Object>emptyMap()));
- _logger.debug("Removing Binding: {}", removedBinding);
-
+ _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.emptyMap()));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
index 4a30d8e..66933ec 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@@ -42,7 +43,6 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -53,8 +53,7 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
private final TopicParser _parser = new TopicParser();
- private final Map<String, TopicExchangeResult> _topicExchangeResults =
- new ConcurrentHashMap<String, TopicExchangeResult>();
+ private final Map<String, TopicExchangeResult> _topicExchangeResults = new ConcurrentHashMap<>();
private final Map<BindingIdentifier, Map<String,Object>> _bindings = new HashMap<>();
@@ -72,115 +71,48 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
_logger.debug("Updating binding of queue {} with routing key {}", destination.getName(), bindingKey);
-
String routingKey = TopicNormalizer.normalize(bindingKey);
try
{
-
if (_bindings.containsKey(binding))
{
- Map<String, Object> oldArgs = _bindings.put(binding, newArguments);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
-
- if (FilterSupport.argumentsContainFilter(newArguments))
- {
- if (FilterSupport.argumentsContainFilter(oldArgs))
- {
- result.replaceQueueFilter(destination,
- FilterSupport.createMessageFilter(oldArgs, destination),
- FilterSupport.createMessageFilter(newArguments, destination));
- }
- else
- {
- result.addFilteredQueue(destination, FilterSupport.createMessageFilter(newArguments, destination));
- result.removeUnfilteredQueue(destination);
- }
- }
- else
- {
- if (FilterSupport.argumentsContainFilter(oldArgs))
- {
- result.addUnfilteredQueue(destination);
- result.removeFilteredQueue(destination, FilterSupport.createMessageFilter(oldArgs, destination));
- }
- else
- {
- // TODO - fix control flow
- return;
- }
- }
-
+ updateTopicExchangeResult(result, binding, newArguments);
}
}
catch (AMQInvalidArgumentException e)
{
throw new ConnectionScopedRuntimeException(e);
}
-
-
}
- protected synchronized void registerQueue(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
+ private synchronized void bind(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
{
final String bindingKey = binding.getBindingKey();
- Queue<?> queue = (Queue<?>) binding.getDestination();
-
- _logger.debug("Registering queue {} with routing key {}", queue.getName(), bindingKey);
+ MessageDestination messageDestination = binding.getDestination();
+ _logger.debug("Registering messageDestination {} with routing key {}", messageDestination.getName(), bindingKey);
String routingKey = TopicNormalizer.normalize(bindingKey);
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(_bindings.containsKey(binding))
{
- Map<String,Object> oldArgs = _bindings.put(binding, arguments);
- TopicExchangeResult result = _topicExchangeResults.get(routingKey);
-
- if(FilterSupport.argumentsContainFilter(arguments))
- {
- if(FilterSupport.argumentsContainFilter(oldArgs))
- {
- result.replaceQueueFilter(queue,
- FilterSupport.createMessageFilter(oldArgs, queue),
- FilterSupport.createMessageFilter(arguments, queue));
- }
- else
- {
- result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
- result.removeUnfilteredQueue(queue);
- }
- }
- else
- {
- if(FilterSupport.argumentsContainFilter(oldArgs))
- {
- result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
- }
- else
- {
- // TODO - fix control flow
- return;
- }
- }
-
- result.addBinding(binding);
-
+ updateTopicExchangeResult(result, binding, arguments);
}
else
{
-
- TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(result == null)
{
result = new TopicExchangeResult();
if(FilterSupport.argumentsContainFilter(arguments))
{
- result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
+ result.addFilteredDestination(messageDestination, FilterSupport.createMessageFilter(arguments, messageDestination));
}
else
{
- result.addUnfilteredQueue(queue);
+ result.addUnfilteredDestination(messageDestination);
}
_parser.addBinding(routingKey, result);
_topicExchangeResults.put(routingKey,result);
@@ -189,18 +121,17 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
{
if(FilterSupport.argumentsContainFilter(arguments))
{
- result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
+ result.addFilteredDestination(messageDestination, FilterSupport.createMessageFilter(arguments, messageDestination));
}
else
{
- result.addUnfilteredQueue(queue);
+ result.addUnfilteredDestination(messageDestination);
}
}
- result.addBinding(binding);
_bindings.put(binding, arguments);
+ result.addBinding(binding, arguments);
}
-
}
@Override
@@ -213,17 +144,20 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
? ""
: routingAddress;
- final Collection<MessageDestination> matchedQueues =
- getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey);
+ final Map<MessageDestination, Set<String>> matchedDestinations =
+ getMatchedDestinations(Filterable.Factory.newInstance(payload, instanceProperties), routingKey);
- for(MessageDestination queue : matchedQueues)
+ for(Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
{
- result.add(queue.route(payload, routingAddress, instanceProperties));
+ MessageDestination destination = entry.getKey();
+ Set<String> replacementKeys = entry.getValue();
+ replacementKeys.forEach(replacementKey -> result.add(destination.route(payload, replacementKey == null ? routingAddress : replacementKey, instanceProperties)));
+
}
}
- private synchronized boolean deregisterQueue(final BindingIdentifier binding)
+ private synchronized boolean unbind(final BindingIdentifier binding)
{
if(_bindings.containsKey(binding))
{
@@ -240,8 +174,9 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
{
try
{
- result.removeFilteredQueue((Queue<?>) binding.getDestination(), FilterSupport.createMessageFilter(bindingArgs,
- (Queue<?>) binding.getDestination()));
+ result.removeFilteredDestination(binding.getDestination(),
+ FilterSupport.createMessageFilter(bindingArgs,
+ binding.getDestination()));
}
catch (AMQInvalidArgumentException e)
{
@@ -250,8 +185,10 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
}
else
{
- result.removeUnfilteredQueue((Queue<?>) binding.getDestination());
+ result.removeUnfilteredDestination(binding.getDestination());
}
+
+ // shall we delete the result from _topicExchangeResults if result is empty?
return true;
}
else
@@ -260,30 +197,37 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
}
}
- private Collection<MessageDestination> getMatchedQueues(Filterable message, String routingKey)
+ private Map<MessageDestination, Set<String>> getMatchedDestinations(Filterable message, String routingKey)
{
-
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
- switch(results.size())
+ if (!results.isEmpty())
{
- case 0:
- return Collections.EMPTY_SET;
- case 1:
- TopicMatcherResult[] resultQueues = new TopicMatcherResult[1];
- results.toArray(resultQueues);
- return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
- default:
- Collection<MessageDestination> queues = new HashSet<>();
- for(TopicMatcherResult result : results)
+ Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
+ for (TopicMatcherResult result : results)
+ {
+ TopicExchangeResult topicExchangeResult = (TopicExchangeResult) result;
+ Map<MessageDestination, String> destinations = topicExchangeResult.processMessage(message);
+ if (!destinations.isEmpty())
{
- TopicExchangeResult res = (TopicExchangeResult)result;
-
- queues = res.processMessage(message, queues);
+ destinations.forEach((destination, replacementKey) ->
+ {
+ Set<String> currentKeys = matchedDestinations.get(destination);
+ if (currentKeys == null)
+ {
+ matchedDestinations.put(destination, Collections.singleton(replacementKey));
+ }
+ else if (!currentKeys.contains(replacementKey))
+ {
+ Set<String> newKeys = new HashSet<>(currentKeys);
+ newKeys.add(replacementKey);
+ matchedDestinations.put(destination, newKeys);
+ }
+ });
}
- return queues;
+ }
+ return matchedDestinations;
}
-
-
+ return Collections.emptyMap();
}
@Override
@@ -291,7 +235,7 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
{
try
{
- registerQueue(binding, arguments);
+ bind(binding, arguments);
}
catch (AMQInvalidArgumentException e)
{
@@ -303,7 +247,36 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
@Override
protected void onUnbind(final BindingIdentifier binding)
{
- deregisterQueue(binding);
+ unbind(binding);
+ }
+
+ private void updateTopicExchangeResult(final TopicExchangeResult result, final BindingIdentifier binding,
+ final Map<String, Object> newArguments)
+ throws AMQInvalidArgumentException
+ {
+ Map<String, Object> oldArgs = _bindings.put(binding, newArguments);
+ MessageDestination destination = binding.getDestination();
+
+ if (FilterSupport.argumentsContainFilter(newArguments))
+ {
+ if (FilterSupport.argumentsContainFilter(oldArgs))
+ {
+ result.replaceDestinationFilter(destination,
+ FilterSupport.createMessageFilter(oldArgs, destination),
+ FilterSupport.createMessageFilter(newArguments, destination));
+ }
+ else
+ {
+ result.addFilteredDestination(destination, FilterSupport.createMessageFilter(newArguments, destination));
+ result.removeUnfilteredDestination(destination);
+ }
+ }
+ else if (FilterSupport.argumentsContainFilter(oldArgs))
+ {
+ result.addUnfilteredDestination(destination);
+ result.removeFilteredDestination(destination, FilterSupport.createMessageFilter(oldArgs, destination));
+ }
+ result.addBinding(binding, newArguments);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index eb9bcaf..86b1f23 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -20,105 +20,69 @@
*/
package org.apache.qpid.server.exchange.topic;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.model.Binding;
public final class TopicExchangeResult implements TopicMatcherResult
{
- private final List<AbstractExchange.BindingIdentifier> _bindings = new CopyOnWriteArrayList<>();
- private final Map<MessageDestination, Integer> _unfilteredQueues = new ConcurrentHashMap<>();
- private final ConcurrentMap<MessageDestination, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
- private volatile ArrayList<MessageDestination> _unfilteredQueueList = new ArrayList<>(0);
+ private final Map<MessageDestination, Integer> _unfilteredDestinations = new ConcurrentHashMap<>();
+ private final ConcurrentMap<MessageDestination, Map<FilterManager,Integer>> _filteredDestinations = new ConcurrentHashMap<>();
+ private final Map<MessageDestination, String> _replacementKeys = new ConcurrentHashMap<>();
- public void addUnfilteredQueue(MessageDestination queue)
+ public void addUnfilteredDestination(MessageDestination destination)
{
- Integer instances = _unfilteredQueues.get(queue);
- if(instances == null)
+ _unfilteredDestinations.merge(destination, 1, (oldCount, increment) -> oldCount + increment);
+ }
+
+ public void removeUnfilteredDestination(MessageDestination destination)
+ {
+ Integer instances = _unfilteredDestinations.get(destination);
+ if(instances == 1)
{
- _unfilteredQueues.put(queue, 1);
- ArrayList<MessageDestination> newList = new ArrayList<>(_unfilteredQueueList);
- newList.add(queue);
- _unfilteredQueueList = newList;
+ _unfilteredDestinations.remove(destination);
}
else
{
- _unfilteredQueues.put(queue, instances + 1);
+ _unfilteredDestinations.put(destination, instances - 1);
}
}
- public void removeUnfilteredQueue(MessageDestination queue)
+ public void addBinding(AbstractExchange.BindingIdentifier binding, Map<String, Object> bindingArguments)
{
- Integer instances = _unfilteredQueues.get(queue);
- if(instances == 1)
+ Object keyObject = bindingArguments.get(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY);
+ if (keyObject == null)
{
- _unfilteredQueues.remove(queue);
- ArrayList<MessageDestination> newList = new ArrayList<>(_unfilteredQueueList);
- newList.remove(queue);
- _unfilteredQueueList = newList;
-
+ _replacementKeys.remove(binding.getDestination());
}
else
{
- _unfilteredQueues.put(queue,instances - 1);
+ _replacementKeys.put(binding.getDestination(), String.valueOf(keyObject));
}
-
}
- public Collection<MessageDestination> getUnfilteredQueues()
- {
- return _unfilteredQueues.keySet();
- }
-
- public void addBinding(AbstractExchange.BindingIdentifier binding)
- {
- _bindings.add(binding);
- }
-
public void removeBinding(AbstractExchange.BindingIdentifier binding)
{
- _bindings.remove(binding);
- }
-
- public List<AbstractExchange.BindingIdentifier> getBindings()
- {
- return new ArrayList<>(_bindings);
+ _replacementKeys.remove(binding.getDestination());
}
- public void addFilteredQueue(MessageDestination queue, FilterManager filter)
+ public void addFilteredDestination(MessageDestination destination, FilterManager filter)
{
- Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
- if(filters == null)
- {
- filters = new ConcurrentHashMap<>();
- _filteredQueues.put(queue, filters);
- }
- Integer instances = filters.get(filter);
- if(instances == null)
- {
- filters.put(filter,1);
- }
- else
- {
- filters.put(filter, instances + 1);
- }
-
+ Map<FilterManager, Integer> filters =
+ _filteredDestinations.computeIfAbsent(destination, filterManagerMap -> new ConcurrentHashMap<>());
+ filters.merge(filter, 1, (oldCount, increment) -> oldCount + increment);
}
- public void removeFilteredQueue(MessageDestination queue, FilterManager filter)
+ public void removeFilteredDestination(MessageDestination destination, FilterManager filter)
{
- Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> filters = _filteredDestinations.get(destination);
if(filters != null)
{
Integer instances = filters.get(filter);
@@ -129,7 +93,7 @@ public final class TopicExchangeResult implements TopicMatcherResult
filters.remove(filter);
if(filters.isEmpty())
{
- _filteredQueues.remove(queue);
+ _filteredDestinations.remove(destination);
}
}
else
@@ -142,11 +106,11 @@ public final class TopicExchangeResult implements TopicMatcherResult
}
- public void replaceQueueFilter(MessageDestination queue,
- FilterManager oldFilter,
- FilterManager newFilter)
+ public void replaceDestinationFilter(MessageDestination queue,
+ FilterManager oldFilter,
+ FilterManager newFilter)
{
- Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> filters = _filteredDestinations.get(queue);
Map<FilterManager,Integer> newFilters = new ConcurrentHashMap<>(filters);
Integer oldFilterInstances = filters.get(oldFilter);
if(oldFilterInstances == 1)
@@ -166,45 +130,35 @@ public final class TopicExchangeResult implements TopicMatcherResult
{
newFilters.put(newFilter, newFilterInstances+1);
}
- _filteredQueues.put(queue,newFilters);
+ _filteredDestinations.put(queue, newFilters);
}
- public Collection<MessageDestination> processMessage(Filterable msg, Collection<MessageDestination> queues)
+ public Map<MessageDestination, String> processMessage(Filterable msg)
{
- if(queues == null)
- {
- if(_filteredQueues.isEmpty())
- {
- return _unfilteredQueueList;
- }
- else
- {
- queues = new HashSet<>();
- }
- }
- else if(!(queues instanceof Set))
+ Map<MessageDestination, String> result = new HashMap<>();
+ for(MessageDestination unfilteredDestination: _unfilteredDestinations.keySet())
{
- queues = new HashSet<>(queues);
+ result.put(unfilteredDestination, _replacementKeys.get(unfilteredDestination));
}
- queues.addAll(_unfilteredQueues.keySet());
- if(!_filteredQueues.isEmpty())
+ if(!_filteredDestinations.isEmpty())
{
- for(Map.Entry<MessageDestination, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet())
+ for(Map.Entry<MessageDestination, Map<FilterManager, Integer>> entry : _filteredDestinations.entrySet())
{
- if(!queues.contains(entry.getKey()))
+ MessageDestination destination = entry.getKey();
+ if(!_unfilteredDestinations.containsKey(destination))
{
for(FilterManager filter : entry.getValue().keySet())
{
if(filter.allAllow(msg))
{
- queues.add(entry.getKey());
+ result.put(destination, _replacementKeys.get(destination));
}
}
}
}
}
- return queues;
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
index 214ca23..bfe6fbe 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
@@ -35,7 +35,7 @@ public class TopicParser
private static final String TOPIC_DELIMITER = "\\.";
private final TopicWordDictionary _dictionary = new TopicWordDictionary();
- private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
+ private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<>();
private static class Position
{
@@ -46,7 +46,7 @@ public class TopicParser
private boolean _followedByAnyLoop;
- public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
+ private Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
{
_position = position;
_word = word;
@@ -55,32 +55,32 @@ public class TopicParser
}
- public TopicWord getWord()
+ private TopicWord getWord()
{
return _word;
}
- public boolean isSelfTransition()
+ private boolean isSelfTransition()
{
return _selfTransition;
}
- public int getPosition()
+ private int getPosition()
{
return _position;
}
- public boolean isEndState()
+ private boolean isEndState()
{
return _endState;
}
- public boolean isFollowedByAnyLoop()
+ private boolean isFollowedByAnyLoop()
{
return _followedByAnyLoop;
}
- public void setFollowedByAnyLoop(boolean followedByAnyLoop)
+ private void setFollowedByAnyLoop(boolean followedByAnyLoop)
{
_followedByAnyLoop = followedByAnyLoop;
}
@@ -123,7 +123,7 @@ public class TopicParser
TopicMatcherDFAState stateMachine = _stateMachine.get();
if(stateMachine == null)
{
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
else
{
@@ -132,7 +132,7 @@ public class TopicParser
}
- TopicMatcherDFAState createStateMachine(String bindingKey, TopicMatcherResult result)
+ private TopicMatcherDFAState createStateMachine(String bindingKey, TopicMatcherResult result)
{
List<TopicWord> wordList = createTopicWordList(bindingKey);
int wildCards = 0;
@@ -146,17 +146,17 @@ public class TopicParser
if(wildCards == 0)
{
TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1];
- states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result));
+ states[states.length-1] = new TopicMatcherDFAState(Collections.emptyMap(), Collections.singleton(result));
for(int i = states.length-2; i >= 0; i--)
{
- states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET);
+ states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.emptySet());
}
return states[0];
}
else if(wildCards == wordList.size())
{
- Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<TopicWord,TopicMatcherDFAState>();
+ Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<>();
TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result));
stateMap.put(TopicWord.ANY_WORD, state);
return state;
@@ -232,7 +232,7 @@ public class TopicParser
// we approach this by examining steps of increasing length - so we
// look how far we can go from the start position in 1 word, 2 words, etc...
- Map<Set<Position>,SimpleState> stateMap = new HashMap<Set<Position>,SimpleState>();
+ Map<Set<Position>,SimpleState> stateMap = new HashMap<>();
SimpleState state = new SimpleState();
@@ -243,7 +243,7 @@ public class TopicParser
SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]);
HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length];
- Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<SimpleState, TopicMatcherDFAState>();
+ Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<>();
for(int i = 0; i < simpleStates.length; i++)
{
@@ -266,10 +266,10 @@ public class TopicParser
}
else
{
- results = Collections.EMPTY_SET;
+ results = Collections.emptySet();
}
- dfaStateMaps[i] = new HashMap<TopicWord, TopicMatcherDFAState>();
+ dfaStateMaps[i] = new HashMap<>();
simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results));
}
@@ -295,7 +295,7 @@ public class TopicParser
final Map<Set<Position>, SimpleState> stateMap,
final Position[] positions)
{
- Map<TopicWord, Set<Position>> transitions = new HashMap<TopicWord,Set<Position>>();
+ Map<TopicWord, Set<Position>> transitions = new HashMap<>();
for(Position pos : state._positions)
{
@@ -304,7 +304,7 @@ public class TopicParser
Set<Position> dest = transitions.get(TopicWord.ANY_WORD);
if(dest == null)
{
- dest = new HashSet<Position>();
+ dest = new HashSet<>();
transitions.put(TopicWord.ANY_WORD,dest);
}
dest.add(pos);
@@ -316,7 +316,7 @@ public class TopicParser
Set<Position> dest = transitions.get(pos.getWord());
if(dest == null)
{
- dest = new HashSet<Position>();
+ dest = new HashSet<>();
transitions.put(pos.getWord(),dest);
}
dest.add(nextPosition);
@@ -332,7 +332,7 @@ public class TopicParser
}
}
- state._nextState = new HashMap<TopicWord, SimpleState>();
+ state._nextState = new HashMap<>();
for(Map.Entry<TopicWord,Set<Position>> dest : transitions.entrySet())
{
@@ -370,7 +370,7 @@ public class TopicParser
}
if(anyLoop != null)
{
- Collection<Position> removals = new ArrayList<Position>();
+ Collection<Position> removals = new ArrayList<>();
for(Position destPos : dest.getValue())
{
if(destPos.getPosition() < anyLoop.getPosition())
@@ -402,7 +402,7 @@ public class TopicParser
SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD);
if(anyWordState != null)
{
- List<TopicWord> removeList = new ArrayList<TopicWord>();
+ List<TopicWord> removeList = new ArrayList<>();
for(Map.Entry<TopicWord,SimpleState> entry : state._nextState.entrySet())
{
if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD)
@@ -424,7 +424,7 @@ public class TopicParser
String[] tokens = bindingKey.split(TOPIC_DELIMITER);
TopicWord previousWord = null;
- List<TopicWord> wordList = new ArrayList<TopicWord>();
+ List<TopicWord> wordList = new ArrayList<>();
for(String token : tokens)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
index bb18bcc..7f0979e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.message;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -177,6 +178,16 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
return refusalMessages.toString();
}
+ public int getNumberOfRoutes()
+ {
+ return _queues.size();
+ }
+
+ public Collection<BaseQueue> getRoutes()
+ {
+ return Collections.unmodifiableCollection(_queues);
+ }
+
private static class RejectReason
{
private final RejectType _rejectType;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
index 5c1a2d2..8f6089f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -25,6 +25,7 @@ import java.util.Map;
@ManagedAttributeValueType
public interface Binding extends PublishingLink
{
+ String BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY = "x-replacement-routing-key";
String TYPE = "binding";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 2b5614b..3dd62b5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -104,7 +104,8 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
@Param(name = "arguments", defaultValue = "{}") Map<String, Object> arguments,
@Param(name = "replaceExistingArguments", defaultValue = "false") boolean replaceExistingArguments);
- @ManagedOperation(changesConfiguredObjectState = true)
+ @ManagedOperation(changesConfiguredObjectState = true,
+ description = "Deletes all the bindings matching the given destination and bindingKey")
boolean unbind(@Param(name="destination", mandatory = true) String destination,
@Param(name="bindingKey") String bindingKey);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org