You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 21:02:06 UTC

svn commit: r786204 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ activemq-openwire/src/test/java/org/apache/activemq/legacy/o...

Author: chirino
Date: Thu Jun 18 19:02:06 2009
New Revision: 786204

URL: http://svn.apache.org/viewvc?rev=786204&view=rev
Log:
Fixed testQueueSendThenAddConsumer
 * Need to auto create destination on demand.


Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Thu Jun 18 19:02:06 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo.broker;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,9 +30,12 @@
 import org.apache.activemq.apollo.broker.TopicDomain;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 final public class Router {
-
+	static final private Log LOG = LogFactory.getLog(Router.class); 
+	
     public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
     public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
     public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
@@ -84,9 +88,9 @@
         }
     }
 
-    public void route(final BrokerMessageDelivery msg, ISourceController<?> controller) {
+    public void route(final BrokerMessageDelivery msg, ISourceController<?> controller, boolean autoCreate) {
 
-        Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
+        Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, autoCreate);
 
         //Set up the delivery for persistence:
         msg.beginDispatch(database);
@@ -116,16 +120,28 @@
         }
     }
 
-    private Collection<DeliveryTarget> route(Destination destination, MessageDelivery msg) {
+    private Collection<DeliveryTarget> route(Destination destination, MessageDelivery msg, boolean autoCreate) {
         // Handles routing to composite/multi destinations.
         Collection<Destination> destinationList = destination.getDestinations();
         if (destinationList == null) {
             Domain domain = domains.get(destination.getDomain());
-            return domain.route(destination.getName(), msg);
+            Collection<DeliveryTarget> rc = domain.route(destination.getName(), msg);
+            // We can auto create queues in the queue domain..
+            if(rc==null && autoCreate && destination.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+            	try {
+					Queue queue = virtualHost.createQueue(destination);
+					rc = new ArrayList<DeliveryTarget>(1);
+					rc.add(queue);
+				} catch (Exception e) {
+					LOG.error("Failed to auto create queue: "+destination.getName()+": "+e);
+					LOG.debug("Failed to auto create queue: "+destination.getName(),e);
+				}
+            }
+			return rc;
         } else {
             HashSet<DeliveryTarget> rc = new HashSet<DeliveryTarget>();
             for (Destination d : destinationList) {
-                Collection<DeliveryTarget> t = route(d, msg);
+                Collection<DeliveryTarget> t = route(d, msg, autoCreate);
                 if (t != null) {
                     rc.addAll(t);
                 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 18 19:02:06 2009
@@ -423,7 +423,7 @@
 
             controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
                 public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
-                    router.route(msg, controller);
+                    router.route(msg, controller, true);
                     controller.elementDispatched(msg);
                 }
 

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Thu Jun 18 19:02:06 2009
@@ -43,7 +43,39 @@
     public byte destinationType;
     public boolean durableConsumer;
     protected static final int MAX_NULL_WAIT=500;
+    public void initCombosForTestQueueSendThenAddConsumer() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+    }
+
+    public void testQueueSendThenAddConsumer() throws Exception {
 
+        // Start a producer
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        destination = createDestinationInfo(connection, connectionInfo, destinationType);
+
+        // Send a message to the broker.
+        connection.send(createMessage(producerInfo, destination, deliveryMode));
+
+        // Start the consumer
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Make sure the message was delivered.
+        Message m = receiveMessage(connection);
+        assertNotNull(m);
+
+    }
     public void initCombosForTestCompositeSend() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
                                                            Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1251,39 +1283,6 @@
         assertNoMessagesLeft(connection);
     }
 
-    public void initCombosForTestQueueSendThenAddConsumer() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
-    }
-
-    public void testQueueSendThenAddConsumer() throws Exception {
-
-        // Start a producer
-        StubConnection connection = createConnection();
-        ConnectionInfo connectionInfo = createConnectionInfo();
-        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-        connection.send(connectionInfo);
-        connection.send(sessionInfo);
-        connection.send(producerInfo);
-
-        destination = createDestinationInfo(connection, connectionInfo, destinationType);
-
-        // Send a message to the broker.
-        connection.send(createMessage(producerInfo, destination, deliveryMode));
-
-        // Start the consumer
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
-        // Make sure the message was delivered.
-        Message m = receiveMessage(connection);
-        assertNotNull(m);
-
-    }
 
     public void initCombosForTestQueueAckRemovesMessage() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Thu Jun 18 19:02:06 2009
@@ -20,7 +20,6 @@
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -238,7 +237,7 @@
                     if (elem.isResponseRequired()) {
                         elem.setPersistListener(StompProtocolHandler.this);
                     }
-                    router.route(elem, controller);
+                    router.route(elem, controller, true);
                     controller.elementDispatched(elem);
                 }