You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/09/10 00:11:11 UTC
[5/6] qpid-broker-j git commit: QPID-8238: Optimize topic exchange
message matching functionality
QPID-8238: Optimize topic exchange message matching functionality
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0510ed39
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0510ed39
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0510ed39
Branch: refs/heads/master
Commit: 0510ed3972bbc27175ca399e0239cadfa0f91fb4
Parents: 8c58e38
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Sep 3 15:58:33 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Sep 10 01:10:54 2018 +0100
----------------------------------------------------------------------
.../qpid/server/exchange/TopicExchangeImpl.java | 41 +++++---------
.../exchange/topic/TopicExchangeResult.java | 59 +++++++++++++++++++-
2 files changed, 71 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0510ed39/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
index 39b4718..c7f6bc7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
@@ -140,19 +140,18 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
InstanceProperties instanceProperties,
RoutingResult<M> result)
{
- final String routingKey = routingAddress == null
- ? ""
- : routingAddress;
+ final String routingKey = routingAddress == null ? "" : routingAddress;
final Map<MessageDestination, Set<String>> matchedDestinations =
getMatchedDestinations(Filterable.Factory.newInstance(payload, instanceProperties), routingKey);
- for(Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
+ if (!matchedDestinations.isEmpty())
{
- MessageDestination destination = entry.getKey();
- Set<String> replacementKeys = entry.getValue();
- replacementKeys.forEach(replacementKey -> result.add(destination.route(payload, replacementKey == null ? routingAddress : replacementKey, instanceProperties)));
-
+ for (Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
+ {
+ MessageDestination destination = entry.getKey();
+ entry.getValue().forEach(key -> result.add(destination.route(payload, key, instanceProperties)));
+ }
}
}
@@ -197,32 +196,18 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
}
}
- private Map<MessageDestination, Set<String>> getMatchedDestinations(Filterable message, String routingKey)
+ private Map<MessageDestination, Set<String>> getMatchedDestinations(final Filterable message,
+ final String routingKey)
{
- Collection<TopicMatcherResult> results = _parser.parse(routingKey);
+ final Collection<TopicMatcherResult> results = _parser.parse(routingKey);
if (!results.isEmpty())
{
- Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
+ final Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
for (TopicMatcherResult result : results)
{
- TopicExchangeResult topicExchangeResult = (TopicExchangeResult) result;
- Map<MessageDestination, String> destinations = topicExchangeResult.processMessage(message);
- if (!destinations.isEmpty())
+ if (result instanceof TopicExchangeResult)
{
- destinations.forEach((destination, replacementKey) ->
- {
- Set<String> currentKeys = matchedDestinations.get(destination);
- if (currentKeys == null)
- {
- matchedDestinations.put(destination, Collections.singleton(replacementKey));
- }
- else if (!currentKeys.contains(replacementKey))
- {
- Set<String> newKeys = new HashSet<>(currentKeys);
- newKeys.add(replacementKey);
- matchedDestinations.put(destination, newKeys);
- }
- });
+ ((TopicExchangeResult) result).processMessage(message, matchedDestinations, routingKey);
}
}
return matchedDestinations;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0510ed39/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index efb51af..6855495 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.exchange.topic;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -133,10 +136,11 @@ public final class TopicExchangeResult implements TopicMatcherResult
_filteredDestinations.put(queue, newFilters);
}
+ @Deprecated
public Map<MessageDestination, String> processMessage(Filterable msg)
{
Map<MessageDestination, String> result = new HashMap<>();
- for(MessageDestination unfilteredDestination: _unfilteredDestinations.keySet())
+ for (MessageDestination unfilteredDestination : _unfilteredDestinations.keySet())
{
result.put(unfilteredDestination, _replacementKeys.get(unfilteredDestination));
}
@@ -161,4 +165,57 @@ public final class TopicExchangeResult implements TopicMatcherResult
return result;
}
+ public void processMessage(final Filterable msg,
+ final Map<MessageDestination, Set<String>> result,
+ final String routingKey)
+ {
+ if (!_unfilteredDestinations.isEmpty())
+ {
+ for (MessageDestination unfilteredDestination : _unfilteredDestinations.keySet())
+ {
+ addMatch(unfilteredDestination, result, routingKey);
+ }
+ }
+
+ if (!_filteredDestinations.isEmpty())
+ {
+ for (Map.Entry<MessageDestination, Map<FilterManager, Integer>> entry : _filteredDestinations.entrySet())
+ {
+ MessageDestination destination = entry.getKey();
+ if (!_unfilteredDestinations.containsKey(destination))
+ {
+ for (FilterManager filter : entry.getValue().keySet())
+ {
+ if (filter.allAllow(msg))
+ {
+ addMatch(destination, result, routingKey);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void addMatch(MessageDestination destination,
+ Map<MessageDestination, Set<String>> result,
+ String routingKey)
+ {
+ String replacementKey = _replacementKeys.getOrDefault(destination, routingKey);
+ Set<String> currentKeys = result.get(destination);
+ if (currentKeys == null)
+ {
+ result.put(destination, Collections.singleton(replacementKey));
+ }
+ else if (!currentKeys.contains(replacementKey))
+ {
+ if (currentKeys.size() == 1)
+ {
+ currentKeys = new HashSet<>(currentKeys);
+ result.put(destination, currentKeys);
+ }
+ currentKeys.add(replacementKey);
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org