You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/09/19 12:29:18 UTC

[4/4] qpid-broker-j git commit: QPID-8244: Optimize topic exchange message matching functionality

QPID-8244: Optimize topic exchange message matching functionality


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/5578e943
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/5578e943
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/5578e943

Branch: refs/heads/master
Commit: 5578e9438db1ddebd43c571b35b78bc7e5467e8f
Parents: 77d461a
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Sep 3 15:58:33 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Sep 19 13:24:17 2018 +0100

----------------------------------------------------------------------
 .../qpid/server/exchange/TopicExchangeImpl.java |  41 +++-----
 .../exchange/topic/TopicExchangeResult.java     |  59 ++++++++++-
 .../exchange/topic/TopicExchangeResultTest.java | 103 +++++++++++++++++++
 3 files changed, 174 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5578e943/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
index 39b4718..c7f6bc7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
@@ -140,19 +140,18 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
                                                                                      InstanceProperties instanceProperties,
                                                                                      RoutingResult<M> result)
     {
-        final String routingKey = routingAddress == null
-                ? ""
-                : routingAddress;
+        final String routingKey = routingAddress == null ? "" : routingAddress;
 
         final Map<MessageDestination, Set<String>> matchedDestinations =
                 getMatchedDestinations(Filterable.Factory.newInstance(payload, instanceProperties), routingKey);
 
-        for(Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
+        if (!matchedDestinations.isEmpty())
         {
-            MessageDestination destination = entry.getKey();
-            Set<String> replacementKeys = entry.getValue();
-            replacementKeys.forEach(replacementKey -> result.add(destination.route(payload, replacementKey == null ? routingAddress : replacementKey, instanceProperties)));
-
+            for (Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
+            {
+                MessageDestination destination = entry.getKey();
+                entry.getValue().forEach(key -> result.add(destination.route(payload, key, instanceProperties)));
+            }
         }
     }
 
@@ -197,32 +196,18 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
         }
     }
 
-    private Map<MessageDestination, Set<String>> getMatchedDestinations(Filterable message, String routingKey)
+    private Map<MessageDestination, Set<String>> getMatchedDestinations(final Filterable message,
+                                                                        final String routingKey)
     {
-        Collection<TopicMatcherResult> results = _parser.parse(routingKey);
+        final Collection<TopicMatcherResult> results = _parser.parse(routingKey);
         if (!results.isEmpty())
         {
-            Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
+            final Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
             for (TopicMatcherResult result : results)
             {
-                TopicExchangeResult topicExchangeResult = (TopicExchangeResult) result;
-                Map<MessageDestination, String> destinations = topicExchangeResult.processMessage(message);
-                if (!destinations.isEmpty())
+                if (result instanceof TopicExchangeResult)
                 {
-                    destinations.forEach((destination, replacementKey) ->
-                                 {
-                                     Set<String> currentKeys = matchedDestinations.get(destination);
-                                     if (currentKeys == null)
-                                     {
-                                         matchedDestinations.put(destination, Collections.singleton(replacementKey));
-                                     }
-                                     else if (!currentKeys.contains(replacementKey))
-                                     {
-                                         Set<String> newKeys = new HashSet<>(currentKeys);
-                                         newKeys.add(replacementKey);
-                                         matchedDestinations.put(destination, newKeys);
-                                     }
-                                 });
+                    ((TopicExchangeResult) result).processMessage(message, matchedDestinations, routingKey);
                 }
             }
             return matchedDestinations;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5578e943/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index efb51af..6855495 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -20,8 +20,11 @@
  */
 package org.apache.qpid.server.exchange.topic;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -133,10 +136,11 @@ public final class TopicExchangeResult implements TopicMatcherResult
         _filteredDestinations.put(queue, newFilters);
     }
 
+    @Deprecated
     public Map<MessageDestination, String> processMessage(Filterable msg)
     {
         Map<MessageDestination, String> result = new HashMap<>();
-        for(MessageDestination unfilteredDestination: _unfilteredDestinations.keySet())
+        for (MessageDestination unfilteredDestination : _unfilteredDestinations.keySet())
         {
             result.put(unfilteredDestination, _replacementKeys.get(unfilteredDestination));
         }
@@ -161,4 +165,57 @@ public final class TopicExchangeResult implements TopicMatcherResult
         return result;
     }
 
+    public void processMessage(final Filterable msg,
+                               final Map<MessageDestination, Set<String>> result,
+                               final String routingKey)
+    {
+        if (!_unfilteredDestinations.isEmpty())
+        {
+            for (MessageDestination unfilteredDestination : _unfilteredDestinations.keySet())
+            {
+                addMatch(unfilteredDestination, result, routingKey);
+            }
+        }
+
+        if (!_filteredDestinations.isEmpty())
+        {
+            for (Map.Entry<MessageDestination, Map<FilterManager, Integer>> entry : _filteredDestinations.entrySet())
+            {
+                MessageDestination destination = entry.getKey();
+                if (!_unfilteredDestinations.containsKey(destination))
+                {
+                    for (FilterManager filter : entry.getValue().keySet())
+                    {
+                        if (filter.allAllow(msg))
+                        {
+                            addMatch(destination, result, routingKey);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void addMatch(MessageDestination destination,
+                          Map<MessageDestination, Set<String>> result,
+                          String routingKey)
+    {
+        String replacementKey = _replacementKeys.getOrDefault(destination, routingKey);
+        Set<String> currentKeys = result.get(destination);
+        if (currentKeys == null)
+        {
+            result.put(destination, Collections.singleton(replacementKey));
+        }
+        else if (!currentKeys.contains(replacementKey))
+        {
+            if (currentKeys.size() == 1)
+            {
+                currentKeys = new HashSet<>(currentKeys);
+                result.put(destination, currentKeys);
+            }
+            currentKeys.add(replacementKey);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5578e943/broker-core/src/test/java/org/apache/qpid/server/exchange/topic/TopicExchangeResultTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/topic/TopicExchangeResultTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/topic/TopicExchangeResultTest.java
new file mode 100644
index 0000000..f709ad0
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/topic/TopicExchangeResultTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange.topic;
+
+import static org.apache.qpid.server.model.Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.MessageDestination;
+
+public class TopicExchangeResultTest
+{
+
+    @Test
+    public void processMessageForUnfilteredDestinations()
+    {
+        final TopicExchangeResult result = new TopicExchangeResult();
+
+        final MessageDestination unfilteredDestination1 = mock(MessageDestination.class);
+        result.addUnfilteredDestination(unfilteredDestination1);
+        result.addBinding(new AbstractExchange.BindingIdentifier("key1", unfilteredDestination1),
+                          Collections.emptyMap());
+
+        final MessageDestination unfilteredDestination2 = mock(MessageDestination.class);
+        result.addUnfilteredDestination(unfilteredDestination2);
+        result.addBinding(new AbstractExchange.BindingIdentifier("key2", unfilteredDestination2),
+                          Collections.singletonMap(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "replacement"));
+
+        final Filterable msg = mock(Filterable.class);
+        final Map<MessageDestination, Set<String>> matches = new HashMap<>();
+
+        result.processMessage(msg, matches, "test");
+
+        assertTrue("Unfiltered destination is not found ", matches.containsKey(unfilteredDestination1));
+        assertEquals(Collections.singleton("test"), matches.get(unfilteredDestination1));
+        assertTrue("Replacement key destination is not found ", matches.containsKey(unfilteredDestination2));
+        assertEquals(Collections.singleton("replacement"), matches.get(unfilteredDestination2));
+    }
+
+
+    @Test
+    public void processMessageForFilteredDestinations()
+    {
+        final TopicExchangeResult result = new TopicExchangeResult();
+
+        final MessageDestination matchingFilteredDestination = mock(MessageDestination.class);
+        final FilterManager matchingFilter = mock(FilterManager.class);
+        result.addFilteredDestination(matchingFilteredDestination, matchingFilter);
+        result.addBinding(new AbstractExchange.BindingIdentifier("key1", matchingFilteredDestination),
+                          Collections.emptyMap());
+        result.addBinding(new AbstractExchange.BindingIdentifier("key3", matchingFilteredDestination),
+                          Collections.singletonMap(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "replacement"));
+
+        final MessageDestination notMatchingFilteredDestination = mock(MessageDestination.class);
+        final FilterManager nonMatchingFilter = mock(FilterManager.class);
+        result.addFilteredDestination(notMatchingFilteredDestination, nonMatchingFilter);
+        result.addBinding(new AbstractExchange.BindingIdentifier("key2", notMatchingFilteredDestination),
+                          Collections.emptyMap());
+
+        final Filterable msg = mock(Filterable.class);
+        when(matchingFilter.allAllow(msg)).thenReturn(true);
+        when(nonMatchingFilter.allAllow(msg)).thenReturn(false);
+
+        final Map<MessageDestination, Set<String>> matches = new HashMap<>();
+        result.processMessage(msg, matches, "test");
+
+        assertTrue("Matched destination is not found ", matches.containsKey(matchingFilteredDestination));
+        assertEquals(Collections.singleton("replacement"), matches.get(matchingFilteredDestination));
+        assertFalse("Unfiltered destination is not found ", matches.containsKey(notMatchingFilteredDestination));
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org