You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/07/24 12:27:40 UTC

svn commit: r424994 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/spring/ test/java/org/apache/activemq/usecases/

Author: jstrachan
Date: Mon Jul 24 03:27:32 2006
New Revision: 424994

URL: http://svn.apache.org/viewvc?rev=424994&view=rev
Log:
added patch from Brian Madigan. To see thread see: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
for background see: http://incubator.apache.org/activemq/virtual-destinations.html

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=424994&r1=424993&r2=424994&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Jul 24 03:27:32 2006
@@ -62,6 +62,7 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.usecases.VirtualTopicPubSubTest;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.ServiceStopper;
@@ -122,6 +123,7 @@
     private AtomicBoolean started = new AtomicBoolean(false);
     private BrokerPlugin[] plugins;
     private boolean keepDurableSubsActive=true;
+    private boolean useVirtualTopics=true;
     private BrokerId brokerId;
 
     /**
@@ -820,6 +822,20 @@
     public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
         this.keepDurableSubsActive = keepDurableSubsActive;
     }
+    
+    public boolean isUseVirtualTopics() {
+        return useVirtualTopics;
+    }
+
+    /**
+     * Sets whether or not
+     * <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>
+     * should be supported.
+     */
+    public void setUseVirtualTopics(boolean useVirtualTopics) {
+        this.useVirtualTopics = useVirtualTopics;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
     /**
@@ -1012,6 +1028,9 @@
         broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
         if (isAdvisorySupport()) {
             broker = new AdvisoryBroker(broker);
+        }
+        if (isUseVirtualTopics()) {
+            broker = new VirtualTopicBroker(broker);
         }
         broker = new CompositeDestinationBroker(broker);
         if (isPopulateJMSXUserID()) {

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java?rev=424994&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java Mon Jul 24 03:27:32 2006
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Implements <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>.
+ * 
+ * @version $Revision: $
+ */
+public class VirtualTopicBroker extends BrokerPluginSupport {
+
+    public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*.";
+
+    public VirtualTopicBroker() {
+    }
+    
+    public VirtualTopicBroker(Broker broker) {
+        setNext(broker);
+    }
+
+    public void send(ConnectionContext ctx, Message message) throws Exception {
+
+        String name = message.getDestination().getPhysicalName();
+
+        String virtualName = VIRTUAL_WILDCARD + name;
+
+        Set destinations = getDestinations(new ActiveMQQueue(virtualName));
+
+        for (Iterator iter = destinations.iterator(); iter.hasNext();) {
+            Destination dest = (Destination) iter.next();
+            dest.send(ctx, message);
+        }
+        getNext().send(ctx, message);
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java?rev=424994&r1=424993&r2=424994&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java Mon Jul 24 03:27:32 2006
@@ -21,12 +21,15 @@
 import java.util.ArrayList;
 import java.util.List;
 
-public class ConsumerBean implements MessageListener {
+import junit.framework.Assert;
+
+public class ConsumerBean extends Assert implements MessageListener {
     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
             .getLog(ConsumerBean.class);
-    
+
     private List messages = new ArrayList();
     private Object semaphore;
+    private boolean verbose;
 
     /**
      * Constructor.
@@ -37,6 +40,7 @@
 
     /**
      * Constructor, initialized semaphore object.
+     * 
      * @param semaphore
      */
     public ConsumerBean(Object semaphore) {
@@ -54,10 +58,14 @@
 
     /**
      * Method implemented from MessageListener interface.
+     * 
      * @param message
      */
     public synchronized void onMessage(Message message) {
         messages.add(message);
+        if (verbose) {
+            log.info("Received: " + message);
+        }
         synchronized (semaphore) {
             semaphore.notifyAll();
         }
@@ -88,6 +96,7 @@
 
     /**
      * Used to wait for a message to arrive given a particular message count.
+     * 
      * @param messageCount
      */
     public void waitForMessagesToArrive(int messageCount) {
@@ -113,8 +122,26 @@
         log.info("End of wait for " + end + " millis");
     }
 
+    public void assertMessagesArrived(int total) {
+        waitForMessagesToArrive(total);
+        synchronized (this) {
+            int count = messages.size();
+
+            assertEquals("Messages received", total, count);
+        }
+    }
+
+    public boolean isVerbose() {
+        return verbose;
+    }
+
+    public void setVerbose(boolean verbose) {
+        this.verbose = verbose;
+    }
+
     /**
      * Identifies if the message is empty.
+     * 
      * @return
      */
     protected boolean hasReceivedMessage() {
@@ -123,6 +150,7 @@
 
     /**
      * Identifies if the message count has reached the total size of message.
+     * 
      * @param messageCount
      * @return
      */

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java?rev=424994&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java Mon Jul 24 03:27:32 2006
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
+
+    private Connection connection;
+
+    public void testVirtualTopicCreation() throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        ConsumerBean messageList = new ConsumerBean();
+        messageList.setVerbose(true);
+        
+        String queueAName = "ActiveMQ.Virtual.A.TEST";
+        // create consumer 'cluster'
+        ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
+        ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer c1 = session.createConsumer(queue1);
+        MessageConsumer c2 = session.createConsumer(queue2);
+
+        c1.setMessageListener(messageList);
+        c2.setMessageListener(messageList);
+
+        // create topic producer
+        MessageProducer producer = session.createProducer(new ActiveMQTopic("TEST"));
+        assertNotNull(producer);
+
+        int total = 10;
+        for (int i = 0; i < total; i++) {
+            producer.send(session.createTextMessage("message: " + i));
+        }
+        
+        messageList.assertMessagesArrived(total);
+    }
+
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java
------------------------------------------------------------------------------
    svn:eol-style = native