You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/07/24 12:27:40 UTC
svn commit: r424994 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/spring/
test/java/org/apache/activemq/usecases/
Author: jstrachan
Date: Mon Jul 24 03:27:32 2006
New Revision: 424994
URL: http://svn.apache.org/viewvc?rev=424994&view=rev
Log:
added patch from Brian Madigan. To see thread see: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
for background see: http://incubator.apache.org/activemq/virtual-destinations.html
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=424994&r1=424993&r2=424994&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Jul 24 03:27:32 2006
@@ -62,6 +62,7 @@
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.usecases.VirtualTopicPubSubTest;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
@@ -122,6 +123,7 @@
private AtomicBoolean started = new AtomicBoolean(false);
private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive=true;
+ private boolean useVirtualTopics=true;
private BrokerId brokerId;
/**
@@ -820,6 +822,20 @@
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
+
+ public boolean isUseVirtualTopics() {
+ return useVirtualTopics;
+ }
+
+ /**
+ * Sets whether or not
+ * <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>
+ * should be supported.
+ */
+ public void setUseVirtualTopics(boolean useVirtualTopics) {
+ this.useVirtualTopics = useVirtualTopics;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
/**
@@ -1012,6 +1028,9 @@
broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
if (isAdvisorySupport()) {
broker = new AdvisoryBroker(broker);
+ }
+ if (isUseVirtualTopics()) {
+ broker = new VirtualTopicBroker(broker);
}
broker = new CompositeDestinationBroker(broker);
if (isPopulateJMSXUserID()) {
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java?rev=424994&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java Mon Jul 24 03:27:32 2006
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Implements <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>.
+ *
+ * @version $Revision: $
+ */
+public class VirtualTopicBroker extends BrokerPluginSupport {
+
+ public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*.";
+
+ public VirtualTopicBroker() {
+ }
+
+ public VirtualTopicBroker(Broker broker) {
+ setNext(broker);
+ }
+
+ public void send(ConnectionContext ctx, Message message) throws Exception {
+
+ String name = message.getDestination().getPhysicalName();
+
+ String virtualName = VIRTUAL_WILDCARD + name;
+
+ Set destinations = getDestinations(new ActiveMQQueue(virtualName));
+
+ for (Iterator iter = destinations.iterator(); iter.hasNext();) {
+ Destination dest = (Destination) iter.next();
+ dest.send(ctx, message);
+ }
+ getNext().send(ctx, message);
+ }
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/VirtualTopicBroker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java?rev=424994&r1=424993&r2=424994&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java Mon Jul 24 03:27:32 2006
@@ -21,12 +21,15 @@
import java.util.ArrayList;
import java.util.List;
-public class ConsumerBean implements MessageListener {
+import junit.framework.Assert;
+
+public class ConsumerBean extends Assert implements MessageListener {
private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(ConsumerBean.class);
-
+
private List messages = new ArrayList();
private Object semaphore;
+ private boolean verbose;
/**
* Constructor.
@@ -37,6 +40,7 @@
/**
* Constructor, initialized semaphore object.
+ *
* @param semaphore
*/
public ConsumerBean(Object semaphore) {
@@ -54,10 +58,14 @@
/**
* Method implemented from MessageListener interface.
+ *
* @param message
*/
public synchronized void onMessage(Message message) {
messages.add(message);
+ if (verbose) {
+ log.info("Received: " + message);
+ }
synchronized (semaphore) {
semaphore.notifyAll();
}
@@ -88,6 +96,7 @@
/**
* Used to wait for a message to arrive given a particular message count.
+ *
* @param messageCount
*/
public void waitForMessagesToArrive(int messageCount) {
@@ -113,8 +122,26 @@
log.info("End of wait for " + end + " millis");
}
+ public void assertMessagesArrived(int total) {
+ waitForMessagesToArrive(total);
+ synchronized (this) {
+ int count = messages.size();
+
+ assertEquals("Messages received", total, count);
+ }
+ }
+
+ public boolean isVerbose() {
+ return verbose;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
/**
* Identifies if the message is empty.
+ *
* @return
*/
protected boolean hasReceivedMessage() {
@@ -123,6 +150,7 @@
/**
* Identifies if the message count has reached the total size of message.
+ *
* @param messageCount
* @return
*/
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java?rev=424994&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java Mon Jul 24 03:27:32 2006
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
+
+ private Connection connection;
+
+ public void testVirtualTopicCreation() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList = new ConsumerBean();
+ messageList.setVerbose(true);
+
+ String queueAName = "ActiveMQ.Virtual.A.TEST";
+ // create consumer 'cluster'
+ ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
+ ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer c1 = session.createConsumer(queue1);
+ MessageConsumer c2 = session.createConsumer(queue2);
+
+ c1.setMessageListener(messageList);
+ c2.setMessageListener(messageList);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(new ActiveMQTopic("TEST"));
+ assertNotNull(producer);
+
+ int total = 10;
+ for (int i = 0; i < total; i++) {
+ producer.send(session.createTextMessage("message: " + i));
+ }
+
+ messageList.assertMessagesArrived(total);
+ }
+
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ super.tearDown();
+ }
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/VirtualTopicPubSubTest.java
------------------------------------------------------------------------------
svn:eol-style = native