You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/15 12:36:25 UTC
svn commit: r1326298 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/virtual/
main/java/org/apache/activemq/plugin/
test/java/org/apache/activemq/broker/virtual/
test/java/org/apache/activemq/spring/ test/resources/o...
Author: rajdavies
Date: Sun Apr 15 10:36:24 2012
New Revision: 1326298
URL: http://svn.apache.org/viewvc?rev=1326298&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3004
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
- copied, changed from r1326054, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml
- copied, changed from r1326054, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java?rev=1326298&r1=1326297&r2=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java Sun Apr 15 10:36:24 2012
@@ -16,20 +16,29 @@
*/
package org.apache.activemq.broker.region.virtual;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.util.LRUCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
+ private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
+ LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
+ private SubQueueSelectorCacheBroker selectorCachePlugin;
public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next, prefix, postfix, local);
@@ -45,24 +54,81 @@ public class SelectorAwareVirtualTopicIn
Set<Destination> destinations = broker.getDestinations(destination);
for (Destination dest : destinations) {
- if (matchesSomeConsumer(message, dest)) {
+ if (matchesSomeConsumer(broker, message, dest)) {
dest.send(context, message.copy());
}
}
}
-
- private boolean matchesSomeConsumer(Message message, Destination dest) throws IOException {
+
+ private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
boolean matches = false;
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(dest.getActiveMQDestination());
msgContext.setMessageReference(message);
List<Subscription> subs = dest.getConsumers();
- for (Subscription sub: subs) {
+ for (Subscription sub : subs) {
if (sub.matches(message, msgContext)) {
matches = true;
break;
+
+ }
+ }
+ if (matches == false && subs.size() == 0) {
+ matches = tryMatchingCachedSubs(broker, dest, msgContext);
+ }
+ return matches;
+ }
+
+ private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {
+ boolean matches = false;
+ LOG.debug("No active consumer match found. Will try cache if configured...");
+
+ //retrieve the specific plugin class and lookup the selector for the destination.
+ final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
+
+ if (cache != null) {
+ final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
+ if (selector != null) {
+ try {
+ final BooleanExpression expression = getExpression(selector);
+ matches = expression.matches(msgContext);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
}
}
return matches;
}
+
+ private BooleanExpression getExpression(String selector) throws Exception{
+ BooleanExpression result;
+ synchronized(expressionCache){
+ result = expressionCache.get(selector);
+ if (result == null){
+ result = compileSelector(selector);
+ expressionCache.put(selector,result);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available.
+ */
+ private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) {
+ if (selectorCachePlugin == null) {
+ selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class);
+ } //if
+
+ return selectorCachePlugin;
+ }
+
+ /**
+ * Pre-compile the JMS selector.
+ *
+ * @param selectorExpression The non-null JMS selector expression.
+ */
+ private BooleanExpression compileSelector(final String selectorExpression) throws Exception {
+ return SelectorParser.parse(selectorExpression);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?rev=1326298&r1=1326297&r2=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java Sun Apr 15 10:36:24 2012
@@ -22,6 +22,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
+import org.apache.activemq.util.LRUCache;
/**
* A Destination which implements <a
@@ -34,6 +35,7 @@ public class VirtualTopicInterceptor ext
private String prefix;
private String postfix;
private boolean local;
+ private LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new LRUCache<ActiveMQDestination,ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next);
@@ -51,6 +53,14 @@ public class VirtualTopicInterceptor ext
}
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
- return new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
+ ActiveMQQueue queue;
+ synchronized(cache){
+ queue = cache.get(original);
+ if (queue==null){
+ queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
+ cache.put(original,queue);
+ }
+ }
+ return queue;
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java?rev=1326298&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java Sun Apr 15 10:36:24 2012
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A plugin which allows the caching of the selector from a subscription queue.
+ * <p/>
+ * This stops the build-up of unwanted messages, especially when consumers may
+ * disconnect from time to time when using virtual destinations.
+ * <p/>
+ * This is influenced by code snippets developed by Maciej Rakowicz
+ *
+ * @author Roelof Naude roelof(dot)naude(at)gmail.com
+ * @see https://issues.apache.org/activemq/browse/AMQ-3004
+ * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
+ */
+public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
+
+ /**
+ * The subscription's selector cache. We cache compiled expressions keyed
+ * by the target destination.
+ */
+ private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
+
+ private final File persistFile;
+
+ private boolean running = true;
+ private Thread persistThread;
+ private static final long MAX_PERSIST_INTERVAL = 600000;
+ private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
+
+ /**
+ * Constructor
+ */
+ public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
+ super(next);
+ this.persistFile = persistFile;
+ LOG.info("Using persisted selector cache from[" + persistFile + "]");
+
+ readCache();
+
+ persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
+ persistThread.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ running = false;
+ if (persistThread != null) {
+ persistThread.interrupt();
+ persistThread.join();
+ } //if
+ }
+
+ @Override
+ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+ LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName());
+ if (info.getSelector() != null) {
+ subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector());
+ } //if
+ return super.addConsumer(context, info);
+ }
+
+ private void readCache() {
+ if (persistFile != null && persistFile.exists()) {
+ try {
+ FileInputStream fis = new FileInputStream(persistFile);
+ try {
+ ObjectInputStream in = new ObjectInputStream(fis);
+ try {
+ subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Invalid selector cache data found. Please remove file.", ex);
+ } finally {
+ in.close();
+ } //try
+ } finally {
+ fis.close();
+ } //try
+ } catch (IOException ex) {
+ LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
+ } //try
+ } //if
+ }
+
+ /**
+ * Persist the selector cache.
+ */
+ private void persistCache() {
+ LOG.debug("Persisting selector cache....");
+ try {
+ FileOutputStream fos = new FileOutputStream(persistFile);
+ try {
+ ObjectOutputStream out = new ObjectOutputStream(fos);
+ try {
+ out.writeObject(subSelectorCache);
+ } finally {
+ out.flush();
+ out.close();
+ } //try
+ } catch (IOException ex) {
+ LOG.error("Unable to persist selector cache", ex);
+ } finally {
+ fos.close();
+ } //try
+ } catch (IOException ex) {
+ LOG.error("Unable to access file[" + persistFile + "]", ex);
+ } //try
+ }
+
+ /**
+ * @return The JMS selector for the specified {@code destination}
+ */
+ public String getSelector(final String destination) {
+ return subSelectorCache.get(destination);
+ }
+
+ /**
+ * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ while (running) {
+ try {
+ Thread.sleep(MAX_PERSIST_INTERVAL);
+ } catch (InterruptedException ex) {
+ } //try
+
+ persistCache();
+ }
+ }
+}
+
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java?rev=1326298&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java Sun Apr 15 10:36:24 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.io.File;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPlugin;
+
+/**
+ * A plugin which allows the caching of the selector from a subscription queue.
+ * <p/>
+ * This stops the build-up of unwanted messages, especially when consumers may
+ * disconnect from time to time when using virtual destinations.
+ * <p/>
+ * This is influenced by code snippets developed by Maciej Rakowicz
+ *
+ * @author Roelof Naude roelof(dot)naude(at)gmail.com
+ *@org.apache.xbean.XBean element="virtualSelectorCacheBrokerPlugin"
+ */
+public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin {
+
+
+ private File persistFile;
+
+ @Override
+ public Broker installPlugin(Broker broker) throws Exception {
+ return new SubQueueSelectorCacheBroker(broker, persistFile);
+ }
+
+ /**
+ * Sets the location of the persistent cache
+ */
+ public void setPersistFile(File persistFile) {
+ this.persistFile = persistFile;
+ }
+
+ public File getPersistFile() {
+ return persistFile;
+ }
+}
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java (from r1326054, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java&r1=1326054&r2=1326298&rev=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java Sun Apr 15 10:36:24 2012
@@ -16,58 +16,133 @@
*/
package org.apache.activemq.broker.virtual;
+import java.net.URI;
+
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
-
+import javax.jms.TextMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class VirtualTopicSelectorTest extends CompositeTopicTest {
+/**
+ * Test case for https://issues.apache.org/jira/browse/AMQ-3004
+ */
+
+public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
+ protected Connection connection;
+ protected int total = 3000;
+ protected String messageSelector;
+
+ public void testVirtualTopicDisconnect() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ final ConsumerBean messageList = new ConsumerBean();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ Destination producerDestination = getProducerDestination();
+ Destination destination = getConsumerDsetination();
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination );
+
+ MessageConsumer consumer = session.createConsumer(destination, messageSelector);
- private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorTest.class);
+ MessageListener listener = new MessageListener(){
+ public void onMessage(Message message){
+ messageList.onMessage(message);
+ try {
+ message.acknowledge();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ consumer.setMessageListener(listener);
+
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ int disconnectCount = total/3;
+ int reconnectCount = (total * 2)/3;
+
+ for (int i = 0; i < total; i++) {
+ producer.send(createMessage(session, i));
+
+ if (i==disconnectCount){
+ consumer.close();
+ }
+ if (i==reconnectCount){
+ consumer = session.createConsumer(destination, messageSelector);
+ consumer.setMessageListener(listener);
+ }
+ }
+
+ assertMessagesArrived(messageList,total/2,10000);
+ }
- protected Destination getConsumer1Dsetination() {
- return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
+ protected Destination getConsumerDsetination() {
+ return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
}
- protected Destination getConsumer2Dsetination() {
- return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
- }
-
+
protected Destination getProducerDestination() {
return new ActiveMQTopic("VirtualTopic.TEST");
}
-
- @Override
- protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
- messageList1.assertMessagesArrived(total/2);
- messageList2.assertMessagesArrived(total/2);
-
- messageList1.flushMessages();
- messageList2.flushMessages();
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ messageSelector = "odd = 'no'";
+ }
+
+ protected TextMessage createMessage(Session session, int i) throws JMSException {
+ TextMessage textMessage = session.createTextMessage("message: " + i);
+ if (i % 2 != 0) {
+ textMessage.setStringProperty("odd", "yes");
+ } else {
+ textMessage.setStringProperty("odd", "no");
+ }
+ textMessage.setIntProperty("i", i);
+ return textMessage;
+ }
+
+
+
+ protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) {
+ messageList.assertMessagesArrived(expected,timeout);
+
+ messageList.flushMessages();
+
LOG.info("validate no other messages on queues");
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination1 = getConsumer1Dsetination();
- Destination destination2 = getConsumer2Dsetination();
+ Destination destination1 = getConsumerDsetination();
+
MessageConsumer c1 = session.createConsumer(destination1, null);
- MessageConsumer c2 = session.createConsumer(destination2, null);
- c1.setMessageListener(messageList1);
- c2.setMessageListener(messageList2);
-
+ c1.setMessageListener(messageList);
+
LOG.info("send one simple message that should go to both consumers");
MessageProducer producer = session.createProducer(getProducerDestination());
@@ -75,31 +150,23 @@ public class VirtualTopicSelectorTest ex
producer.send(session.createTextMessage("Last Message"));
- messageList1.assertMessagesArrived(1);
- messageList2.assertMessagesArrived(1);
-
+ messageList.assertMessagesArrived(1);
+
} catch (JMSException e) {
e.printStackTrace();
fail("unexpeced ex while waiting for last messages: " + e);
}
}
-
- @Override
- protected BrokerService createBroker() throws Exception {
- // use message selectors on consumers that need to propagate up to the virtual
- // topic dispatch so that un matched messages do not linger on subscription queues
- messageSelector1 = "odd = 'yes'";
- messageSelector2 = "odd = 'no'";
-
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- VirtualTopic virtualTopic = new VirtualTopic();
- // the new config that enables selectors on the intercepter
- virtualTopic.setSelectorAware(true);
- VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
- interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
- broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
- return broker;
+
+ protected String getBrokerConfigUri() {
+ return "org/apache/activemq/broker/virtual/disconnected-selector.xml";
}
+
+ protected BrokerService createBroker() throws Exception {
+ XBeanBrokerFactory factory = new XBeanBrokerFactory();
+ BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
+ return answer;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java?rev=1326298&r1=1326297&r2=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java Sun Apr 15 10:36:24 2012
@@ -92,17 +92,21 @@ public class ConsumerBean extends Assert
*
* @param messageCount
*/
- public void waitForMessagesToArrive(int messageCount) {
+
+ public void waitForMessagesToArrive(int messageCount){
+ waitForMessagesToArrive(messageCount,120 * 1000);
+ }
+ public void waitForMessagesToArrive(int messageCount,long maxWaitTime) {
long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
long start = System.currentTimeMillis();
- long maxWaitTime = start + 120 * 1000;
+ long endTime = start + maxWaitTime;
while (maxRemainingMessageCount > 0) {
try {
synchronized (messages) {
messages.wait(1000);
}
- if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > maxWaitTime) {
+ if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
break;
}
} catch (InterruptedException e) {
@@ -123,6 +127,15 @@ public class ConsumerBean extends Assert
}
}
+ public void assertMessagesArrived(int total, long maxWaitTime) {
+ waitForMessagesToArrive(total,maxWaitTime);
+ synchronized (messages) {
+ int count = messages.size();
+
+ assertEquals("Messages received", total, count);
+ }
+ }
+
public boolean isVerbose() {
return verbose;
}
Copied: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml (from r1326054, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml?p2=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml&r1=1326054&r2=1326298&rev=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml Sun Apr 15 10:36:24 2012
@@ -18,30 +18,26 @@
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
- <broker xmlns="http://activemq.apache.org/schema/core">
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <compositeQueue name="MY.QUEUE">
- <forwardTo>
- <filteredDestination selector="odd = 'yes'" queue="FOO"/>
- <filteredDestination selector="i = 5" topic="BAR"/>
- </forwardTo>
- </compositeQueue>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- </broker>
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+ <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
+ <destinationInterceptors>
+ <virtualDestinationInterceptor>
+ <virtualDestinations>
+ <virtualTopic name="VirtualTopic.>" prefix="Consumer." selectorAware="true"/>
+ </virtualDestinations>
+ </virtualDestinationInterceptor>
+ </destinationInterceptors>
+ <plugins>
+ <virtualSelectorCacheBrokerPlugin persistFile = "selectorcache.data"/>
+ </plugins>
+ </broker>
</beans>
<!-- END SNIPPET: xbean -->