You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 21:02:06 UTC
svn commit: r786204 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/
activemq-openwire/src/test/java/org/apache/activemq/legacy/o...
Author: chirino
Date: Thu Jun 18 19:02:06 2009
New Revision: 786204
URL: http://svn.apache.org/viewvc?rev=786204&view=rev
Log:
Fixed testQueueSendThenAddConsumer
* Need to auto create destination on demand.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Thu Jun 18 19:02:06 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.apollo.broker;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,9 +30,12 @@
import org.apache.activemq.apollo.broker.TopicDomain;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
final public class Router {
-
+ static final private Log LOG = LogFactory.getLog(Router.class);
+
public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
@@ -84,9 +88,9 @@
}
}
- public void route(final BrokerMessageDelivery msg, ISourceController<?> controller) {
+ public void route(final BrokerMessageDelivery msg, ISourceController<?> controller, boolean autoCreate) {
- Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
+ Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, autoCreate);
//Set up the delivery for persistence:
msg.beginDispatch(database);
@@ -116,16 +120,28 @@
}
}
- private Collection<DeliveryTarget> route(Destination destination, MessageDelivery msg) {
+ private Collection<DeliveryTarget> route(Destination destination, MessageDelivery msg, boolean autoCreate) {
// Handles routing to composite/multi destinations.
Collection<Destination> destinationList = destination.getDestinations();
if (destinationList == null) {
Domain domain = domains.get(destination.getDomain());
- return domain.route(destination.getName(), msg);
+ Collection<DeliveryTarget> rc = domain.route(destination.getName(), msg);
+ // We can auto create queues in the queue domain..
+ if(rc==null && autoCreate && destination.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+ try {
+ Queue queue = virtualHost.createQueue(destination);
+ rc = new ArrayList<DeliveryTarget>(1);
+ rc.add(queue);
+ } catch (Exception e) {
+ LOG.error("Failed to auto create queue: "+destination.getName()+": "+e);
+ LOG.debug("Failed to auto create queue: "+destination.getName(),e);
+ }
+ }
+ return rc;
} else {
HashSet<DeliveryTarget> rc = new HashSet<DeliveryTarget>();
for (Destination d : destinationList) {
- Collection<DeliveryTarget> t = route(d, msg);
+ Collection<DeliveryTarget> t = route(d, msg, autoCreate);
if (t != null) {
rc.addAll(t);
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 18 19:02:06 2009
@@ -423,7 +423,7 @@
controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
- router.route(msg, controller);
+ router.route(msg, controller, true);
controller.elementDispatched(msg);
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Thu Jun 18 19:02:06 2009
@@ -43,7 +43,39 @@
public byte destinationType;
public boolean durableConsumer;
protected static final int MAX_NULL_WAIT=500;
+ public void initCombosForTestQueueSendThenAddConsumer() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueSendThenAddConsumer() throws Exception {
+ // Start a producer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo, destinationType);
+
+ // Send a message to the broker.
+ connection.send(createMessage(producerInfo, destination, deliveryMode));
+
+ // Start the consumer
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Make sure the message was delivered.
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+
+ }
public void initCombosForTestCompositeSend() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1251,39 +1283,6 @@
assertNoMessagesLeft(connection);
}
- public void initCombosForTestQueueSendThenAddConsumer() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType",
- new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
- }
-
- public void testQueueSendThenAddConsumer() throws Exception {
-
- // Start a producer
- StubConnection connection = createConnection();
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
-
- destination = createDestinationInfo(connection, connectionInfo, destinationType);
-
- // Send a message to the broker.
- connection.send(createMessage(producerInfo, destination, deliveryMode));
-
- // Start the consumer
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
-
- // Make sure the message was delivered.
- Message m = receiveMessage(connection);
- assertNotNull(m);
-
- }
public void initCombosForTestQueueAckRemovesMessage() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Thu Jun 18 19:02:06 2009
@@ -20,7 +20,6 @@
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -238,7 +237,7 @@
if (elem.isResponseRequired()) {
elem.setPersistListener(StompProtocolHandler.this);
}
- router.route(elem, controller);
+ router.route(elem, controller, true);
controller.elementDispatched(elem);
}