You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/23 06:41:30 UTC

svn commit: r787540 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-client/src/test/java/org/apache/activemq/apollo/test3/ activemq-client/src/test/java/org/apache/activemq/legacy/test3/

Author: chirino
Date: Tue Jun 23 04:41:30 2009
New Revision: 787540

URL: http://svn.apache.org/viewvc?rev=787540&view=rev
Log:
Better composite and wild card subscription handling for the Queue case.

Added:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java
      - copied, changed from r787446, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java
      - copied, changed from r787446, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java
Removed:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java
Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java?rev=787540&r1=787539&r2=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java Tue Jun 23 04:41:30 2009
@@ -20,7 +20,7 @@
 
 public interface BrokerSubscription {
 
-    public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException ;
+    public void connect(ConsumerContext subscription) throws Exception ;
 
     public void disconnect(ConsumerContext subscription);
     

Copied: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java (from r787446, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java?p2=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java&p1=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java&r1=787446&r2=787540&rev=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java Tue Jun 23 04:41:30 2009
@@ -16,15 +16,12 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import java.util.ArrayList;
+
 import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
-import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.FilterException;
-import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.Subscription;
 
 /**
- * MultiSubscription
+ * CompositeSubscription
  * <p>
  * Description:
  * </p>
@@ -32,86 +29,29 @@
  * @author cmacnaug
  * @version 1.0
  */
-public class MultiSubscription implements BrokerSubscription, DeliveryTarget {
+public class CompositeSubscription implements BrokerSubscription {
 
     private final Destination destination;
-    private final VirtualHost host;
-    private final BooleanExpression selector;
-    private Subscription<MessageDelivery> connectedSub;
+    
+    private final ArrayList<BrokerSubscription> subscriptions;
 
-    MultiSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+    public CompositeSubscription(Destination destination, ArrayList<BrokerSubscription> subscriptions) {
         this.destination = destination;
-        this.host = host;
-        this.selector = selector;
+        this.subscriptions = subscriptions;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
-     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
-     */
-    public final void deliver(MessageDelivery message, ISourceController<?> source) {
-        Subscription<MessageDelivery> s = connectedSub;
-        if (s != null) {
-            s.add(message, source, null);
+    public void connect(ConsumerContext consumer) throws Exception {
+        for (BrokerSubscription sub : subscriptions) {
+            sub.connect(consumer);
         }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
-     */
-    public boolean hasSelector() {
-        return selector != null;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void connect(ConsumerContext subsription) throws UserAlreadyConnectedException {
-        connectedSub = subsription;
-        host.getRouter().bind(destination, this);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void disconnect(ConsumerContext context) {
-        host.getRouter().unbind(destination, this);
-        connectedSub = null;
-    }
-
-    public boolean matches(MessageDelivery message) {
-        if (selector == null) {
-            return true;
-        }
-
-        MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
-        selectorContext.setDestination(destination);
-        try {
-            return (selector.matches(selectorContext));
-        } catch (FilterException e) {
-            e.printStackTrace();
-            return false;
+    public synchronized void disconnect(ConsumerContext consumer) {
+        for (BrokerSubscription sub : subscriptions) {
+            sub.disconnect(consumer);
         }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
-     */
     public Destination getDestination() {
         return destination;
     }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=787540&r1=787539&r2=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Tue Jun 23 04:41:30 2009
@@ -18,11 +18,13 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
+import org.apache.activemq.apollo.broker.path.PathFilter;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
 import org.apache.activemq.protobuf.AsciiBuffer;
@@ -164,6 +166,10 @@
             Domain domain = router.getDomain(dest.getDomain());
             domain.bind(dest.getName(), queue);
             queues.put(dest.getName(), queue);
+            
+            for (QueueLifecyleListener l : queueLifecyleListeners) {
+                l.onCreate(queue);
+            }
         }
         queue.start();
         return queue;
@@ -174,41 +180,55 @@
     }
 
     public BrokerSubscription createSubscription(ConsumerContext consumer) throws Exception {
-        Destination destination = consumer.getDestination();
-        BrokerSubscription sub = null;
+        return createSubscription(consumer, consumer.getDestination());
+    }
 
-        if (consumer.isDurable()) {
-            DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
-            if (dsub == null) {
-                ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
-                queue.start();
-                dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
-                durableSubs.put(consumer.getSubscriptionName(), dsub);
-            }
-            sub = dsub;
-        } else {
-            if(destination.getDestinations() != null)
-            {
-                sub = new MultiSubscription(this, destination, consumer.getSelectorExpression());
+    public BrokerSubscription createSubscription(ConsumerContext consumer, Destination destination) throws Exception {
+        
+        // First handle composite destinations..  
+        Collection<Destination> destinations = destination.getDestinations();
+        if(destinations != null) {
+            ArrayList<BrokerSubscription> subs = new ArrayList<BrokerSubscription>(destinations.size());
+            for (Destination childDest : destinations) {
+                subs.add(createSubscription(consumer, childDest));
             }
-            else
-            {
-                if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
-                    sub = new TopicSubscription(this, destination, consumer.getSelectorExpression());
-                } else {
-                    Queue queue = queues.get(destination.getName());
-                    if( queue == null ) {
-                    	if( consumer.autoCreateDestination() ) {
-                    		queue = createQueue(destination);
-                    	} else {
-                    		throw new IllegalStateException("The queue does not exist: "+destination.getName());
-                    	}
-                    }
-                    sub = new Queue.QueueSubscription(queue);
+            return new CompositeSubscription(destination, subs);
+        }
+                
+        // If it's a Topic...
+        if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
+            
+            // It might be a durable subscription on the topic
+            if (consumer.isDurable()) {
+                DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
+                if (dsub == null) {
+                    ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+                    queue.start();
+                    dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
+                    durableSubs.put(consumer.getSubscriptionName(), dsub);
                 }
+                return dsub;
             }
+
+            // return a standard subscription
+            return new TopicSubscription(this, destination, consumer.getSelectorExpression());
+        }
+        
+        // It looks like a wild card subscription on a queue.. 
+        if( PathFilter.containsWildCards(destination.getName()) ){
+            return new WildcardQueueSubscription(this, destination, consumer);
         }
-        return sub;
+
+        // It has to be a Queue subscription then..
+        Queue queue = queues.get(destination.getName());
+        if( queue == null ) {
+            if( consumer.autoCreateDestination() ) {
+                queue = createQueue(destination);
+            } else {
+                throw new IllegalStateException("The queue does not exist: "+destination.getName());
+            }
+        }
+        return new Queue.QueueSubscription(queue);
     }
 
 	public Broker getBroker() {
@@ -218,4 +238,30 @@
 	public void setBroker(Broker broker) {
 		this.broker = broker;
 	}
+	
+	interface QueueLifecyleListener {
+	    
+	    /**
+	     * A destination has bean created
+	     * @param destination
+	     */
+        public void onCreate(Queue queue);
+        
+        /**
+         * A destination has bean destroyed 
+         * @param destination
+         */
+        public void onDestroy(Queue queue);
+                
+	}
+	
+	ArrayList<QueueLifecyleListener> queueLifecyleListeners = new ArrayList<QueueLifecyleListener>();
+
+	synchronized public void addDestinationLifecyleListener(QueueLifecyleListener listener) {
+        queueLifecyleListeners.add(listener);
+    }
+
+    synchronized public void removeDestinationLifecyleListener(QueueLifecyleListener listener) {
+        queueLifecyleListeners.add(listener);
+    }
 }

Copied: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java (from r787446, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java?p2=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java&p1=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java&r1=787446&r2=787540&rev=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java Tue Jun 23 04:41:30 2009
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.legacy.test3;
+package org.apache.activemq.apollo.test3;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -25,7 +25,6 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.apollo.test3.JmsTopicSendReceiveTest;
 import org.apache.activemq.command.ActiveMQDestination;
 
 /**