You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2014/02/07 09:42:05 UTC
git commit: Fix for AMQ-4899
Updated Branches:
refs/heads/trunk f88043eaf -> afded924f
Fix for AMQ-4899
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/afded924
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/afded924
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/afded924
Branch: refs/heads/trunk
Commit: afded924ffde28c1f491db761774e14c5e954413
Parents: f88043e
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Fri Feb 7 09:41:50 2014 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Fri Feb 7 09:41:50 2014 +0100
----------------------------------------------------------------------
.../SelectorAwareVirtualTopicInterceptor.java | 18 +-
.../plugin/SubQueueSelectorCacheBroker.java | 35 ++--
.../org/apache/activemq/bugs/AMQ4899Test.java | 192 +++++++++++++++++++
3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
index 8d58a43..0c19565 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
@@ -16,10 +16,6 @@
*/
package org.apache.activemq.broker.region.virtual;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
@@ -35,6 +31,10 @@ import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
@@ -70,10 +70,9 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
if (sub.matches(message, msgContext)) {
matches = true;
break;
-
}
}
- if (matches == false && subs.size() == 0) {
+ if (matches == false) {
matches = tryMatchingCachedSubs(broker, dest, msgContext);
}
return matches;
@@ -87,11 +86,14 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
if (cache != null) {
- final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
- if (selector != null) {
+ final Set<String> selectors = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
+ for (String selector : selectors) {
try {
final BooleanExpression expression = getExpression(selector);
matches = expression.matches(msgContext);
+ if (matches) {
+ return true;
+ }
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
index cfa50f7..af02b54 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
@@ -16,14 +16,6 @@
*/
package org.apache.activemq.plugin;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@@ -32,6 +24,17 @@ import org.apache.activemq.command.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* A plugin which allows the caching of the selector from a subscription queue.
* <p/>
@@ -51,7 +54,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
* The subscription's selector cache. We cache compiled expressions keyed
* by the target destination.
*/
- private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
+ private ConcurrentHashMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
private final File persistFile;
@@ -85,7 +88,8 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
- LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), info.getDestination().getQualifiedName());
+ String destinationName = info.getDestination().getQualifiedName();
+ LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), destinationName);
String selector = info.getSelector();
// As ConcurrentHashMap doesn't support null values, use always true expression
@@ -93,7 +97,12 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
selector = "TRUE";
}
- subSelectorCache.put(info.getDestination().getQualifiedName(), selector);
+ Set<String> selectors = subSelectorCache.get(destinationName);
+ if (selectors == null) {
+ selectors = Collections.synchronizedSet(new HashSet<String>());
+ }
+ selectors.add(selector);
+ subSelectorCache.put(destinationName, selectors);
return super.addConsumer(context, info);
}
@@ -105,7 +114,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
try {
ObjectInputStream in = new ObjectInputStream(fis);
try {
- subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
+ subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject();
} catch (ClassNotFoundException ex) {
LOG.error("Invalid selector cache data found. Please remove file.", ex);
} finally {
@@ -148,7 +157,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
/**
* @return The JMS selector for the specified {@code destination}
*/
- public String getSelector(final String destination) {
+ public Set<String> getSelector(final String destination) {
return subSelectorCache.get(destination);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
new file mode 100644
index 0000000..81140ce
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ4899Test {
+ protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class);
+ private static final String QUEUE_NAME="AMQ4899TestQueue";
+ private static final String CONSUMER_QUEUE="Consumer.Orders.VirtualOrders." + QUEUE_NAME;
+ private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME;
+
+ private static final Integer MESSAGE_LIMIT = 20;
+ public static final String CONSUMER_A_SELECTOR = "Order < " + 10;
+ public static String CONSUMER_B_SELECTOR = "Order >= " + 10;
+ private CountDownLatch consumersStarted = new CountDownLatch(2);
+ private CountDownLatch consumerAtoConsumeCount= new CountDownLatch(10);
+ private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10);
+
+ private BrokerService broker;
+
+ @Before
+ public void setUp() {
+ setupBroker("broker://()/localhost?");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testVirtualTopicMultipleSelectors() throws Exception{
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue consumerQueue = session.createQueue(CONSUMER_QUEUE);
+
+ MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount);
+ MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
+ consumerA.setMessageListener(listenerA);
+
+ MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount);
+ MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
+ consumerB.setMessageListener(listenerB);
+
+ consumersStarted.await(10, TimeUnit.SECONDS);
+ assertEquals("Not all consumers started in time", 0, consumersStarted.getCount());
+
+ Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME);
+ MessageProducer producer = session.createProducer(producerDestination);
+ int messageIndex = 0;
+ for (int i=0; i < MESSAGE_LIMIT; i++) {
+ if (i==3) {
+ LOG.debug("Stopping consumerA");
+ consumerA.close();
+ }
+
+ if (i == 14) {
+ LOG.debug("Stopping consumer B");
+ consumerB.close();
+ }
+ String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString();
+ TextMessage message = session.createTextMessage(messageText);
+ message.setIntProperty("Order", i);
+ LOG.debug("Sending message [{}]", messageText);
+ producer.send(message);
+ Thread.sleep(100);
+ }
+ Thread.sleep(1 * 1000);
+
+ // restart consumerA
+ LOG.debug("Restarting consumerA");
+ consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
+ consumerA.setMessageListener(listenerA);
+
+ // restart consumerB
+ LOG.debug("restarting consumerB");
+ consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
+ consumerB.setMessageListener(listenerB);
+
+ consumerAtoConsumeCount.await(5, TimeUnit.SECONDS);
+ consumerBtoConsumeCount.await(5, TimeUnit.SECONDS);
+
+ LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount());
+
+ assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount());
+ assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount());
+
+ connection.close();
+ }
+
+ /**
+ * Setup broker with VirtualTopic configured
+ */
+ private void setupBroker(String uri) {
+ try {
+ broker = BrokerFactory.createBroker(uri);
+
+ VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+ VirtualTopic virtualTopic = new VirtualTopic();
+ virtualTopic.setName("VirtualOrders.>");
+ virtualTopic.setSelectorAware(true);
+ VirtualDestination[] virtualDestinations = { virtualTopic };
+ interceptor.setVirtualDestinations(virtualDestinations);
+ broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
+
+ SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
+ BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin};
+ broker.setPlugins(updatedPlugins);
+
+ broker.start();
+ broker.waitUntilStarted();
+ } catch (Exception e) {
+ LOG.error("Failed creating broker", e);
+ }
+ }
+}
+
+class AMQ4899Listener implements MessageListener {
+ Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class);
+ CountDownLatch toConsume;
+ String id;
+
+ public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) {
+ this.id = id;
+ this.toConsume = toConsume;
+ started.countDown();
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ toConsume.countDown();
+ try {
+ if (message instanceof TextMessage) {
+ TextMessage textMessage = (TextMessage) message;
+ LOG.debug("Listener {} received [{}]", id, textMessage.getText());
+ } else {
+ LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName());
+ }
+ } catch (JMSException e) {
+ LOG.error("Unexpected JMSException in Listener " + id, e);
+ }
+ }
+}