You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/09/19 12:29:16 UTC

[2/4] qpid-broker-j git commit: Revert "QPID-8238: Optimize topic exchange message matching functionality"

Revert "QPID-8238: Optimize topic exchange message matching functionality"

This reverts commit 0510ed3972bbc27175ca399e0239cadfa0f91fb4.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/aaf575fc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/aaf575fc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/aaf575fc

Branch: refs/heads/master
Commit: aaf575fcdbcd021ff4865e920159363df1ddb168
Parents: 4cc3f78
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Sep 19 12:46:26 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Sep 19 12:46:26 2018 +0100

----------------------------------------------------------------------
 .../qpid/server/exchange/TopicExchangeImpl.java | 41 +++++++++-----
 .../exchange/topic/TopicExchangeResult.java     | 59 +-------------------
 2 files changed, 29 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aaf575fc/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
index c7f6bc7..39b4718 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
@@ -140,18 +140,19 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
                                                                                      InstanceProperties instanceProperties,
                                                                                      RoutingResult<M> result)
     {
-        final String routingKey = routingAddress == null ? "" : routingAddress;
+        final String routingKey = routingAddress == null
+                ? ""
+                : routingAddress;
 
         final Map<MessageDestination, Set<String>> matchedDestinations =
                 getMatchedDestinations(Filterable.Factory.newInstance(payload, instanceProperties), routingKey);
 
-        if (!matchedDestinations.isEmpty())
+        for(Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
         {
-            for (Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
-            {
-                MessageDestination destination = entry.getKey();
-                entry.getValue().forEach(key -> result.add(destination.route(payload, key, instanceProperties)));
-            }
+            MessageDestination destination = entry.getKey();
+            Set<String> replacementKeys = entry.getValue();
+            replacementKeys.forEach(replacementKey -> result.add(destination.route(payload, replacementKey == null ? routingAddress : replacementKey, instanceProperties)));
+
         }
     }
 
@@ -196,18 +197,32 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
         }
     }
 
-    private Map<MessageDestination, Set<String>> getMatchedDestinations(final Filterable message,
-                                                                        final String routingKey)
+    private Map<MessageDestination, Set<String>> getMatchedDestinations(Filterable message, String routingKey)
     {
-        final Collection<TopicMatcherResult> results = _parser.parse(routingKey);
+        Collection<TopicMatcherResult> results = _parser.parse(routingKey);
         if (!results.isEmpty())
         {
-            final Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
+            Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
             for (TopicMatcherResult result : results)
             {
-                if (result instanceof TopicExchangeResult)
+                TopicExchangeResult topicExchangeResult = (TopicExchangeResult) result;
+                Map<MessageDestination, String> destinations = topicExchangeResult.processMessage(message);
+                if (!destinations.isEmpty())
                 {
-                    ((TopicExchangeResult) result).processMessage(message, matchedDestinations, routingKey);
+                    destinations.forEach((destination, replacementKey) ->
+                                 {
+                                     Set<String> currentKeys = matchedDestinations.get(destination);
+                                     if (currentKeys == null)
+                                     {
+                                         matchedDestinations.put(destination, Collections.singleton(replacementKey));
+                                     }
+                                     else if (!currentKeys.contains(replacementKey))
+                                     {
+                                         Set<String> newKeys = new HashSet<>(currentKeys);
+                                         newKeys.add(replacementKey);
+                                         matchedDestinations.put(destination, newKeys);
+                                     }
+                                 });
                 }
             }
             return matchedDestinations;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aaf575fc/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index 6855495..efb51af 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -20,11 +20,8 @@
  */
 package org.apache.qpid.server.exchange.topic;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -136,11 +133,10 @@ public final class TopicExchangeResult implements TopicMatcherResult
         _filteredDestinations.put(queue, newFilters);
     }
 
-    @Deprecated
     public Map<MessageDestination, String> processMessage(Filterable msg)
     {
         Map<MessageDestination, String> result = new HashMap<>();
-        for (MessageDestination unfilteredDestination : _unfilteredDestinations.keySet())
+        for(MessageDestination unfilteredDestination: _unfilteredDestinations.keySet())
         {
             result.put(unfilteredDestination, _replacementKeys.get(unfilteredDestination));
         }
@@ -165,57 +161,4 @@ public final class TopicExchangeResult implements TopicMatcherResult
         return result;
     }
 
-    public void processMessage(final Filterable msg,
-                               final Map<MessageDestination, Set<String>> result,
-                               final String routingKey)
-    {
-        if (!_unfilteredDestinations.isEmpty())
-        {
-            for (MessageDestination unfilteredDestination : _unfilteredDestinations.keySet())
-            {
-                addMatch(unfilteredDestination, result, routingKey);
-            }
-        }
-
-        if (!_filteredDestinations.isEmpty())
-        {
-            for (Map.Entry<MessageDestination, Map<FilterManager, Integer>> entry : _filteredDestinations.entrySet())
-            {
-                MessageDestination destination = entry.getKey();
-                if (!_unfilteredDestinations.containsKey(destination))
-                {
-                    for (FilterManager filter : entry.getValue().keySet())
-                    {
-                        if (filter.allAllow(msg))
-                        {
-                            addMatch(destination, result, routingKey);
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    private void addMatch(MessageDestination destination,
-                          Map<MessageDestination, Set<String>> result,
-                          String routingKey)
-    {
-        String replacementKey = _replacementKeys.getOrDefault(destination, routingKey);
-        Set<String> currentKeys = result.get(destination);
-        if (currentKeys == null)
-        {
-            result.put(destination, Collections.singleton(replacementKey));
-        }
-        else if (!currentKeys.contains(replacementKey))
-        {
-            if (currentKeys.size() == 1)
-            {
-                currentKeys = new HashSet<>(currentKeys);
-                result.put(destination, currentKeys);
-            }
-            currentKeys.add(replacementKey);
-        }
-    }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org