You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/15 12:36:25 UTC

svn commit: r1326298 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/virtual/ main/java/org/apache/activemq/plugin/ test/java/org/apache/activemq/broker/virtual/ test/java/org/apache/activemq/spring/ test/resources/o...

Author: rajdavies
Date: Sun Apr 15 10:36:24 2012
New Revision: 1326298

URL: http://svn.apache.org/viewvc?rev=1326298&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3004

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
      - copied, changed from r1326054, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml
      - copied, changed from r1326054, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java?rev=1326298&r1=1326297&r2=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java Sun Apr 15 10:36:24 2012
@@ -16,20 +16,29 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.util.LRUCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
+    private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
+    LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
+    private SubQueueSelectorCacheBroker selectorCachePlugin;
 
     public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
         super(next, prefix, postfix, local);
@@ -45,24 +54,81 @@ public class SelectorAwareVirtualTopicIn
         Set<Destination> destinations = broker.getDestinations(destination);
 
         for (Destination dest : destinations) {
-            if (matchesSomeConsumer(message, dest)) {
+            if (matchesSomeConsumer(broker, message, dest)) {
                 dest.send(context, message.copy());
             }
         }
     }
-    
-    private boolean matchesSomeConsumer(Message message, Destination dest) throws IOException {
+
+    private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
         boolean matches = false;
         MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
         msgContext.setDestination(dest.getActiveMQDestination());
         msgContext.setMessageReference(message);
         List<Subscription> subs = dest.getConsumers();
-        for (Subscription sub: subs) {
+        for (Subscription sub : subs) {
             if (sub.matches(message, msgContext)) {
                 matches = true;
                 break;
+
+            }
+        }
+        if (matches == false && subs.size() == 0) {
+            matches = tryMatchingCachedSubs(broker, dest, msgContext);
+        }
+        return matches;
+    }
+
+    private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {
+        boolean matches = false;
+        LOG.debug("No active consumer match found. Will try cache if configured...");
+
+        //retrieve the specific plugin class and lookup the selector for the destination.
+        final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
+
+        if (cache != null) {
+            final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
+            if (selector != null) {
+                try {
+                    final BooleanExpression expression = getExpression(selector);
+                    matches = expression.matches(msgContext);
+                } catch (Exception e) {
+                    LOG.error(e.getMessage(), e);
+                }
             }
         }
         return matches;
     }
+
+    private BooleanExpression getExpression(String selector) throws Exception{
+        BooleanExpression result;
+        synchronized(expressionCache){
+            result = expressionCache.get(selector);
+            if (result == null){
+                result = compileSelector(selector);
+                expressionCache.put(selector,result);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available.
+     */
+    private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) {
+        if (selectorCachePlugin == null) {
+            selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class);
+        } //if
+
+        return selectorCachePlugin;
+    }
+
+    /**
+     * Pre-compile the JMS selector.
+     *
+     * @param selectorExpression The non-null JMS selector expression.
+     */
+    private BooleanExpression compileSelector(final String selectorExpression) throws Exception {
+        return SelectorParser.parse(selectorExpression);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?rev=1326298&r1=1326297&r2=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java Sun Apr 15 10:36:24 2012
@@ -22,6 +22,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.util.LRUCache;
 
 /**
  * A Destination which implements <a
@@ -34,6 +35,7 @@ public class VirtualTopicInterceptor ext
     private String prefix;
     private String postfix;
     private boolean local;
+    private LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new LRUCache<ActiveMQDestination,ActiveMQQueue>();
 
     public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
         super(next);
@@ -51,6 +53,14 @@ public class VirtualTopicInterceptor ext
     }
 
     protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
-        return new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
+        ActiveMQQueue queue;
+        synchronized(cache){
+            queue = cache.get(original);
+            if (queue==null){
+                queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
+                cache.put(original,queue);
+            }
+        }
+        return queue;
     }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java?rev=1326298&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java Sun Apr 15 10:36:24 2012
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A plugin which allows the caching of the selector from a subscription queue.
+ * <p/>
+ * This stops the build-up of unwanted messages, especially when consumers may
+ * disconnect from time to time when using virtual destinations.
+ * <p/>
+ * This is influenced by code snippets developed by Maciej Rakowicz
+ *
+ * @author Roelof Naude roelof(dot)naude(at)gmail.com
+ * @see https://issues.apache.org/activemq/browse/AMQ-3004
+ * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
+ */
+public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
+
+    /**
+     * The subscription's selector cache. We cache compiled expressions keyed
+     * by the target destination.
+     */
+    private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
+
+    private final File persistFile;
+
+    private boolean running = true;
+    private Thread persistThread;
+    private static final long MAX_PERSIST_INTERVAL = 600000;
+    private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
+
+    /**
+     * Constructor
+     */
+    public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
+        super(next);
+        this.persistFile = persistFile;
+        LOG.info("Using persisted selector cache from[" + persistFile + "]");
+
+        readCache();
+
+        persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
+        persistThread.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        running = false;
+        if (persistThread != null) {
+            persistThread.interrupt();
+            persistThread.join();
+        } //if
+    }
+
+    @Override
+    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+        LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName());
+        if (info.getSelector() != null) {
+            subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector());
+        } //if
+        return super.addConsumer(context, info);
+    }
+
+    private void readCache() {
+        if (persistFile != null && persistFile.exists()) {
+            try {
+                FileInputStream fis = new FileInputStream(persistFile);
+                try {
+                    ObjectInputStream in = new ObjectInputStream(fis);
+                    try {
+                        subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
+                    } catch (ClassNotFoundException ex) {
+                        LOG.error("Invalid selector cache data found. Please remove file.", ex);
+                    } finally {
+                        in.close();
+                    } //try
+                } finally {
+                    fis.close();
+                } //try
+            } catch (IOException ex) {
+                LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
+            } //try
+        } //if
+    }
+
+    /**
+     * Persist the selector cache.
+     */
+    private void persistCache() {
+        LOG.debug("Persisting selector cache....");
+        try {
+            FileOutputStream fos = new FileOutputStream(persistFile);
+            try {
+                ObjectOutputStream out = new ObjectOutputStream(fos);
+                try {
+                    out.writeObject(subSelectorCache);
+                } finally {
+                    out.flush();
+                    out.close();
+                } //try
+            } catch (IOException ex) {
+                LOG.error("Unable to persist selector cache", ex);
+            } finally {
+                fos.close();
+            } //try
+        } catch (IOException ex) {
+            LOG.error("Unable to access file[" + persistFile + "]", ex);
+        } //try
+    }
+
+    /**
+     * @return The JMS selector for the specified {@code destination}
+     */
+    public String getSelector(final String destination) {
+        return subSelectorCache.get(destination);
+    }
+
+    /**
+     * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
+     *
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        while (running) {
+            try {
+                Thread.sleep(MAX_PERSIST_INTERVAL);
+            } catch (InterruptedException ex) {
+            } //try
+
+            persistCache();
+        }
+    }
+}
+

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java?rev=1326298&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java Sun Apr 15 10:36:24 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.io.File;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPlugin;
+
+/**
+ * A plugin which allows the caching of the selector from a subscription queue.
+ * <p/>
+ * This stops the build-up of unwanted messages, especially when consumers may
+ * disconnect from time to time when using virtual destinations.
+ * <p/>
+ * This is influenced by code snippets developed by Maciej Rakowicz
+ *
+ * @author Roelof Naude roelof(dot)naude(at)gmail.com
+ *@org.apache.xbean.XBean element="virtualSelectorCacheBrokerPlugin"
+ */
+public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin {
+
+
+    private File persistFile;
+
+    @Override
+    public Broker installPlugin(Broker broker) throws Exception {
+        return new SubQueueSelectorCacheBroker(broker, persistFile);
+    }
+
+    /**
+     * Sets the location of the persistent cache
+     */
+    public void setPersistFile(File persistFile) {
+        this.persistFile = persistFile;
+    }
+
+    public File getPersistFile() {
+        return persistFile;
+    }
+}

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java (from r1326054, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java&r1=1326054&r2=1326298&rev=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java Sun Apr 15 10:36:24 2012
@@ -16,58 +16,133 @@
  */
 package org.apache.activemq.broker.virtual;
 
+import java.net.URI;
+
+import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
+import javax.jms.TextMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.spring.ConsumerBean;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class VirtualTopicSelectorTest extends CompositeTopicTest {
+/**
+ * Test case for  https://issues.apache.org/jira/browse/AMQ-3004
+ */
+
+public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
+    protected Connection connection;
+    protected int total = 3000;
+    protected String messageSelector;
+
+    public void testVirtualTopicDisconnect() throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        final ConsumerBean messageList = new ConsumerBean();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        Destination producerDestination = getProducerDestination();
+        Destination destination = getConsumerDsetination();
+
+        LOG.info("Sending to: " + producerDestination);
+        LOG.info("Consuming from: " + destination );
+
+        MessageConsumer consumer = session.createConsumer(destination, messageSelector);
 
-    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorTest.class);
+        MessageListener listener = new MessageListener(){
+            public void onMessage(Message message){
+                messageList.onMessage(message);
+                try {
+                    message.acknowledge();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        consumer.setMessageListener(listener);
+
+
+        // create topic producer
+        MessageProducer producer = session.createProducer(producerDestination);
+        assertNotNull(producer);
+
+        int disconnectCount = total/3;
+        int reconnectCount = (total * 2)/3;
+
+        for (int i = 0; i < total; i++) {
+            producer.send(createMessage(session, i));
+
+            if (i==disconnectCount){
+               consumer.close();
+            }
+            if (i==reconnectCount){
+                consumer = session.createConsumer(destination, messageSelector);
+                consumer.setMessageListener(listener);
+            }
+        }
+
+        assertMessagesArrived(messageList,total/2,10000);
+    }
             
-    protected Destination getConsumer1Dsetination() {
-        return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
+    protected Destination getConsumerDsetination() {
+        return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
     }
 
-    protected Destination getConsumer2Dsetination() {
-        return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
-    }
-    
+
     protected Destination getProducerDestination() {
         return new ActiveMQTopic("VirtualTopic.TEST");
     }
-    
-    @Override
-    protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
-        messageList1.assertMessagesArrived(total/2);
-        messageList2.assertMessagesArrived(total/2);
- 
-        messageList1.flushMessages();
-        messageList2.flushMessages();
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        messageSelector = "odd = 'no'";
+    }
+
+    protected TextMessage createMessage(Session session, int i) throws JMSException {
+        TextMessage textMessage = session.createTextMessage("message: " + i);
+        if (i % 2 != 0) {
+            textMessage.setStringProperty("odd", "yes");
+        } else {
+            textMessage.setStringProperty("odd", "no");
+        }
+        textMessage.setIntProperty("i", i);
+        return textMessage;
+    }
+
+
+
+    protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) {
+        messageList.assertMessagesArrived(expected,timeout);
+
+        messageList.flushMessages();
+
         
         LOG.info("validate no other messages on queues");
         try {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 
-            Destination destination1 = getConsumer1Dsetination();
-            Destination destination2 = getConsumer2Dsetination();
+            Destination destination1 = getConsumerDsetination();
+
             MessageConsumer c1 = session.createConsumer(destination1, null);
-            MessageConsumer c2 = session.createConsumer(destination2, null);
-            c1.setMessageListener(messageList1);
-            c2.setMessageListener(messageList2);
-            
+            c1.setMessageListener(messageList);
+
             
             LOG.info("send one simple message that should go to both consumers");
             MessageProducer producer = session.createProducer(getProducerDestination());
@@ -75,31 +150,23 @@ public class VirtualTopicSelectorTest ex
             
             producer.send(session.createTextMessage("Last Message"));
             
-            messageList1.assertMessagesArrived(1);
-            messageList2.assertMessagesArrived(1);
-        
+            messageList.assertMessagesArrived(1);
+
         } catch (JMSException e) {
             e.printStackTrace();
             fail("unexpeced ex while waiting for last messages: " + e);
         }
     }
-    
-    @Override
-    protected BrokerService createBroker() throws Exception {
-        // use message selectors on consumers that need to propagate up to the virtual
-        // topic dispatch so that un matched messages do not linger on subscription queues
-        messageSelector1 = "odd = 'yes'";
-        messageSelector2 = "odd = 'no'";
-        
-        BrokerService broker = new BrokerService();
-        broker.setPersistent(false);
 
-        VirtualTopic virtualTopic = new VirtualTopic();
-        // the new config that enables selectors on the intercepter
-        virtualTopic.setSelectorAware(true);
-        VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
-        interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
-        broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
-        return broker;
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/disconnected-selector.xml";
     }
+
+    protected BrokerService createBroker() throws Exception {
+        XBeanBrokerFactory factory = new XBeanBrokerFactory();
+        BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
+        return answer;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java?rev=1326298&r1=1326297&r2=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java Sun Apr 15 10:36:24 2012
@@ -92,17 +92,21 @@ public class ConsumerBean extends Assert
      *
      * @param messageCount
      */
-    public void waitForMessagesToArrive(int messageCount) {
+
+    public void waitForMessagesToArrive(int messageCount){
+        waitForMessagesToArrive(messageCount,120 * 1000);
+    }
+    public void waitForMessagesToArrive(int messageCount,long maxWaitTime) {
         long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
         LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
         long start = System.currentTimeMillis();
-        long maxWaitTime = start + 120 * 1000;
+        long endTime = start + maxWaitTime;
         while (maxRemainingMessageCount > 0) {
             try {
                 synchronized (messages) {
                     messages.wait(1000);
                 }
-                if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > maxWaitTime) {
+                if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
                     break;
                 }
             } catch (InterruptedException e) {
@@ -123,6 +127,15 @@ public class ConsumerBean extends Assert
         }
     }
 
+    public void assertMessagesArrived(int total, long maxWaitTime) {
+        waitForMessagesToArrive(total,maxWaitTime);
+        synchronized (messages) {
+            int count = messages.size();
+
+            assertEquals("Messages received", total, count);
+        }
+    }
+
     public boolean isVerbose() {
         return verbose;
     }

Copied: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml (from r1326054, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml?p2=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml&r1=1326054&r2=1326298&rev=1326298&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml Sun Apr 15 10:36:24 2012
@@ -18,30 +18,26 @@
 
 <!-- this file can only be parsed using the xbean-spring library -->
 <!-- START SNIPPET: xbean -->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+<beans
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:amq="http://activemq.apache.org/schema/core"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
-  <broker xmlns="http://activemq.apache.org/schema/core">
-    <destinationInterceptors>
-      <virtualDestinationInterceptor>
-        <virtualDestinations>
-          <compositeQueue name="MY.QUEUE">
-            <forwardTo>
-              <filteredDestination selector="odd = 'yes'" queue="FOO"/>
-              <filteredDestination selector="i = 5" topic="BAR"/>
-            </forwardTo>
-          </compositeQueue>
-        </virtualDestinations>
-      </virtualDestinationInterceptor>
-    </destinationInterceptors>
-
-  </broker>
+      <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
 
+    <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
+        <destinationInterceptors>
+            <virtualDestinationInterceptor>
+                <virtualDestinations>
+                    <virtualTopic name="VirtualTopic.>" prefix="Consumer." selectorAware="true"/>
+                </virtualDestinations>
+            </virtualDestinationInterceptor>
+        </destinationInterceptors>
+        <plugins>
+            <virtualSelectorCacheBrokerPlugin persistFile = "selectorcache.data"/>
+        </plugins>
+    </broker>
 </beans>
 <!-- END SNIPPET: xbean -->