You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2014/02/07 09:42:05 UTC

git commit: Fix for AMQ-4899

Updated Branches:
  refs/heads/trunk f88043eaf -> afded924f


Fix for AMQ-4899


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/afded924
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/afded924
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/afded924

Branch: refs/heads/trunk
Commit: afded924ffde28c1f491db761774e14c5e954413
Parents: f88043e
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Fri Feb 7 09:41:50 2014 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Fri Feb 7 09:41:50 2014 +0100

----------------------------------------------------------------------
 .../SelectorAwareVirtualTopicInterceptor.java   |  18 +-
 .../plugin/SubQueueSelectorCacheBroker.java     |  35 ++--
 .../org/apache/activemq/bugs/AMQ4899Test.java   | 192 +++++++++++++++++++
 3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
index 8d58a43..0c19565 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
@@ -16,10 +16,6 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
@@ -35,6 +31,10 @@ import org.apache.activemq.util.LRUCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
 public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
     private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
     LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
@@ -70,10 +70,9 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
             if (sub.matches(message, msgContext)) {
                 matches = true;
                 break;
-
             }
         }
-        if (matches == false && subs.size() == 0) {
+        if (matches == false) {
             matches = tryMatchingCachedSubs(broker, dest, msgContext);
         }
         return matches;
@@ -87,11 +86,14 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
         final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
 
         if (cache != null) {
-            final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
-            if (selector != null) {
+            final Set<String> selectors = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
+            for (String selector : selectors) {
                 try {
                     final BooleanExpression expression = getExpression(selector);
                     matches = expression.matches(msgContext);
+                    if (matches) {
+                        return true;
+                    }
                 } catch (Exception e) {
                     LOG.error(e.getMessage(), e);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
index cfa50f7..af02b54 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
@@ -16,14 +16,6 @@
  */
 package org.apache.activemq.plugin;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
@@ -32,6 +24,17 @@ import org.apache.activemq.command.ConsumerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * A plugin which allows the caching of the selector from a subscription queue.
  * <p/>
@@ -51,7 +54,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
      * The subscription's selector cache. We cache compiled expressions keyed
      * by the target destination.
      */
-    private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
+    private ConcurrentHashMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
 
     private final File persistFile;
 
@@ -85,7 +88,8 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
 
     @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), info.getDestination().getQualifiedName());
+        String destinationName = info.getDestination().getQualifiedName();
+        LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), destinationName);
         String selector = info.getSelector();
 
         // As ConcurrentHashMap doesn't support null values, use always true expression
@@ -93,7 +97,12 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
             selector = "TRUE";
         }
 
-        subSelectorCache.put(info.getDestination().getQualifiedName(), selector);
+        Set<String> selectors = subSelectorCache.get(destinationName);
+        if (selectors == null) {
+            selectors = Collections.synchronizedSet(new HashSet<String>());
+        }
+        selectors.add(selector);
+        subSelectorCache.put(destinationName, selectors);
 
         return super.addConsumer(context, info);
     }
@@ -105,7 +114,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
                 try {
                     ObjectInputStream in = new ObjectInputStream(fis);
                     try {
-                        subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
+                        subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject();
                     } catch (ClassNotFoundException ex) {
                         LOG.error("Invalid selector cache data found. Please remove file.", ex);
                     } finally {
@@ -148,7 +157,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
     /**
      * @return The JMS selector for the specified {@code destination}
      */
-    public String getSelector(final String destination) {
+    public Set<String> getSelector(final String destination) {
         return subSelectorCache.get(destination);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
new file mode 100644
index 0000000..81140ce
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ4899Test {
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class);
+    private static final String QUEUE_NAME="AMQ4899TestQueue";
+    private static final String CONSUMER_QUEUE="Consumer.Orders.VirtualOrders." + QUEUE_NAME;
+    private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME;
+
+    private static final Integer MESSAGE_LIMIT = 20;
+    public static final String CONSUMER_A_SELECTOR = "Order < " + 10;
+    public static  String CONSUMER_B_SELECTOR = "Order >= " + 10;
+    private CountDownLatch consumersStarted = new CountDownLatch(2);
+    private CountDownLatch consumerAtoConsumeCount= new CountDownLatch(10);
+    private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10);
+
+    private BrokerService broker;
+
+    @Before
+    public void setUp() {
+        setupBroker("broker://()/localhost?");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testVirtualTopicMultipleSelectors() throws Exception{
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue consumerQueue = session.createQueue(CONSUMER_QUEUE);
+
+        MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount);
+        MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
+        consumerA.setMessageListener(listenerA);
+
+        MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount);
+        MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
+        consumerB.setMessageListener(listenerB);
+
+        consumersStarted.await(10, TimeUnit.SECONDS);
+        assertEquals("Not all consumers started in time", 0, consumersStarted.getCount());
+
+        Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME);
+        MessageProducer producer = session.createProducer(producerDestination);
+        int messageIndex = 0;
+        for (int i=0; i < MESSAGE_LIMIT; i++) {
+            if (i==3) {
+                LOG.debug("Stopping consumerA");
+                consumerA.close();
+            }
+
+            if (i == 14) {
+                LOG.debug("Stopping consumer B");
+                consumerB.close();
+            }
+            String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString();
+            TextMessage message = session.createTextMessage(messageText);
+            message.setIntProperty("Order", i);
+            LOG.debug("Sending message [{}]", messageText);
+            producer.send(message);
+            Thread.sleep(100);
+        }
+        Thread.sleep(1 * 1000);
+
+        // restart consumerA
+        LOG.debug("Restarting consumerA");
+        consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
+        consumerA.setMessageListener(listenerA);
+
+        // restart consumerB
+        LOG.debug("restarting consumerB");
+        consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
+        consumerB.setMessageListener(listenerB);
+
+        consumerAtoConsumeCount.await(5, TimeUnit.SECONDS);
+        consumerBtoConsumeCount.await(5, TimeUnit.SECONDS);
+
+        LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount());
+
+        assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount());
+        assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount());
+
+        connection.close();
+    }
+
+    /**
+     * Setup broker with VirtualTopic configured
+     */
+    private void setupBroker(String uri) {
+        try {
+            broker = BrokerFactory.createBroker(uri);
+
+            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+            VirtualTopic virtualTopic = new VirtualTopic();
+            virtualTopic.setName("VirtualOrders.>");
+            virtualTopic.setSelectorAware(true);
+            VirtualDestination[] virtualDestinations = { virtualTopic };
+            interceptor.setVirtualDestinations(virtualDestinations);
+            broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
+
+            SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
+            BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin};
+            broker.setPlugins(updatedPlugins);
+
+            broker.start();
+            broker.waitUntilStarted();
+        } catch (Exception e) {
+            LOG.error("Failed creating broker", e);
+        }
+    }
+}
+
+class AMQ4899Listener implements MessageListener {
+    Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class);
+    CountDownLatch toConsume;
+    String id;
+
+    public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) {
+        this.id = id;
+        this.toConsume = toConsume;
+        started.countDown();
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        toConsume.countDown();
+        try {
+            if (message instanceof TextMessage) {
+                TextMessage textMessage = (TextMessage) message;
+                LOG.debug("Listener {} received [{}]", id, textMessage.getText());
+            } else {
+                LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName());
+            }
+        } catch (JMSException e) {
+            LOG.error("Unexpected JMSException in Listener " + id, e);
+        }
+    }
+}