You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/23 19:32:35 UTC

svn commit: r787755 - /activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java

Author: chirino
Date: Tue Jun 23 17:32:34 2009
New Revision: 787755

URL: http://svn.apache.org/viewvc?rev=787755&view=rev
Log:
opps missed this file.

Added:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java?rev=787755&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java Tue Jun 23 17:32:34 2009
@@ -0,0 +1,82 @@
+package org.apache.activemq.apollo.broker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
+import org.apache.activemq.apollo.broker.VirtualHost.QueueLifecyleListener;
+import org.apache.activemq.apollo.broker.path.PathFilter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class WildcardQueueSubscription implements BrokerSubscription, QueueLifecyleListener {
+
+    static final private Log LOG = LogFactory.getLog(WildcardQueueSubscription.class);
+    
+    private final VirtualHost host;
+    private final Destination destination;
+    private final ConsumerContext consumer;
+    private final PathFilter filter;
+    
+    private final ArrayList<BrokerSubscription> childSubs = new ArrayList<BrokerSubscription>();
+
+    public WildcardQueueSubscription(VirtualHost host, Destination destination, ConsumerContext consumer) {
+        this.host = host;
+        this.destination = destination;
+        this.consumer = consumer;
+        filter = PathFilter.parseFilter(destination.getName());
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // BrokerSubscription interface implementation
+    ///////////////////////////////////////////////////////////////////
+    public void connect(ConsumerContext cc) throws Exception {
+        assert cc == consumer;
+        synchronized(host) {
+            Domain domain = host.getRouter().getDomain(Router.QUEUE_DOMAIN);
+            Collection<DeliveryTarget> matches = domain.route(destination.getName(), null);
+            for (DeliveryTarget target : matches) {
+                Queue queue = (Queue) target;
+                BrokerSubscription childSub = host.createSubscription(consumer, queue.getDestination());
+                childSubs.add(childSub);
+                childSub.connect(consumer);
+            }
+            host.addDestinationLifecyleListener(this);
+        }
+    }
+
+    public void disconnect(ConsumerContext cc) {
+        assert cc == consumer;
+        synchronized(host) {
+            host.removeDestinationLifecyleListener(this);
+            for (BrokerSubscription childSub : childSubs) {
+                childSub.disconnect(cc);
+            }
+            childSubs.clear();
+        }
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // QueueLifecyleListener interface implementation
+    ///////////////////////////////////////////////////////////////////
+    public void onCreate(Queue queue) {
+        if( filter.matches(queue.getDestination().getName()) ) {
+            try {
+                BrokerSubscription childSub = host.createSubscription(consumer, queue.getDestination());
+                childSubs.add(childSub);
+                childSub.connect(consumer);
+            } catch (Exception e) {
+                LOG.warn("Could not create dynamic subscription to "+queue.getDestination()+": "+e);
+                LOG.debug("Could not create dynamic subscription to "+queue.getDestination()+": ", e);
+            }
+        }
+    }
+
+    public void onDestroy(Queue queue) {
+    }
+
+}