You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/23 06:41:30 UTC
svn commit: r787540 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-client/src/test/java/org/apache/activemq/apollo/test3/
activemq-client/src/test/java/org/apache/activemq/legacy/test3/
Author: chirino
Date: Tue Jun 23 04:41:30 2009
New Revision: 787540
URL: http://svn.apache.org/viewvc?rev=787540&view=rev
Log:
Better composite and wild card subscription handling for the Queue case.
Added:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java
- copied, changed from r787446, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java
- copied, changed from r787446, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java
Removed:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java?rev=787540&r1=787539&r2=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java Tue Jun 23 04:41:30 2009
@@ -20,7 +20,7 @@
public interface BrokerSubscription {
- public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException ;
+ public void connect(ConsumerContext subscription) throws Exception ;
public void disconnect(ConsumerContext subscription);
Copied: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java (from r787446, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java?p2=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java&p1=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java&r1=787446&r2=787540&rev=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java Tue Jun 23 04:41:30 2009
@@ -16,15 +16,12 @@
*/
package org.apache.activemq.apollo.broker;
+import java.util.ArrayList;
+
import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
-import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.FilterException;
-import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.Subscription;
/**
- * MultiSubscription
+ * CompositeSubscription
* <p>
* Description:
* </p>
@@ -32,86 +29,29 @@
* @author cmacnaug
* @version 1.0
*/
-public class MultiSubscription implements BrokerSubscription, DeliveryTarget {
+public class CompositeSubscription implements BrokerSubscription {
private final Destination destination;
- private final VirtualHost host;
- private final BooleanExpression selector;
- private Subscription<MessageDelivery> connectedSub;
+
+ private final ArrayList<BrokerSubscription> subscriptions;
- MultiSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+ public CompositeSubscription(Destination destination, ArrayList<BrokerSubscription> subscriptions) {
this.destination = destination;
- this.host = host;
- this.selector = selector;
+ this.subscriptions = subscriptions;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
- * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
- */
- public final void deliver(MessageDelivery message, ISourceController<?> source) {
- Subscription<MessageDelivery> s = connectedSub;
- if (s != null) {
- s.add(message, source, null);
+ public void connect(ConsumerContext consumer) throws Exception {
+ for (BrokerSubscription sub : subscriptions) {
+ sub.connect(consumer);
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
- */
- public boolean hasSelector() {
- return selector != null;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
- * .broker.protocol.ProtocolHandler.ConsumerContext)
- */
- public synchronized void connect(ConsumerContext subsription) throws UserAlreadyConnectedException {
- connectedSub = subsription;
- host.getRouter().bind(destination, this);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
- * .broker.protocol.ProtocolHandler.ConsumerContext)
- */
- public synchronized void disconnect(ConsumerContext context) {
- host.getRouter().unbind(destination, this);
- connectedSub = null;
- }
-
- public boolean matches(MessageDelivery message) {
- if (selector == null) {
- return true;
- }
-
- MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
- selectorContext.setDestination(destination);
- try {
- return (selector.matches(selectorContext));
- } catch (FilterException e) {
- e.printStackTrace();
- return false;
+ public synchronized void disconnect(ConsumerContext consumer) {
+ for (BrokerSubscription sub : subscriptions) {
+ sub.disconnect(consumer);
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
- */
public Destination getDestination() {
return destination;
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=787540&r1=787539&r2=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Tue Jun 23 04:41:30 2009
@@ -18,11 +18,13 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.apache.activemq.Service;
import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
+import org.apache.activemq.apollo.broker.path.PathFilter;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
import org.apache.activemq.protobuf.AsciiBuffer;
@@ -164,6 +166,10 @@
Domain domain = router.getDomain(dest.getDomain());
domain.bind(dest.getName(), queue);
queues.put(dest.getName(), queue);
+
+ for (QueueLifecyleListener l : queueLifecyleListeners) {
+ l.onCreate(queue);
+ }
}
queue.start();
return queue;
@@ -174,41 +180,55 @@
}
public BrokerSubscription createSubscription(ConsumerContext consumer) throws Exception {
- Destination destination = consumer.getDestination();
- BrokerSubscription sub = null;
+ return createSubscription(consumer, consumer.getDestination());
+ }
- if (consumer.isDurable()) {
- DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
- if (dsub == null) {
- ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
- queue.start();
- dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
- durableSubs.put(consumer.getSubscriptionName(), dsub);
- }
- sub = dsub;
- } else {
- if(destination.getDestinations() != null)
- {
- sub = new MultiSubscription(this, destination, consumer.getSelectorExpression());
+ public BrokerSubscription createSubscription(ConsumerContext consumer, Destination destination) throws Exception {
+
+ // First handle composite destinations..
+ Collection<Destination> destinations = destination.getDestinations();
+ if(destinations != null) {
+ ArrayList<BrokerSubscription> subs = new ArrayList<BrokerSubscription>(destinations.size());
+ for (Destination childDest : destinations) {
+ subs.add(createSubscription(consumer, childDest));
}
- else
- {
- if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
- sub = new TopicSubscription(this, destination, consumer.getSelectorExpression());
- } else {
- Queue queue = queues.get(destination.getName());
- if( queue == null ) {
- if( consumer.autoCreateDestination() ) {
- queue = createQueue(destination);
- } else {
- throw new IllegalStateException("The queue does not exist: "+destination.getName());
- }
- }
- sub = new Queue.QueueSubscription(queue);
+ return new CompositeSubscription(destination, subs);
+ }
+
+ // If it's a Topic...
+ if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
+
+ // It might be a durable subscription on the topic
+ if (consumer.isDurable()) {
+ DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
+ if (dsub == null) {
+ ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+ queue.start();
+ dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
+ durableSubs.put(consumer.getSubscriptionName(), dsub);
}
+ return dsub;
}
+
+ // return a standard subscription
+ return new TopicSubscription(this, destination, consumer.getSelectorExpression());
+ }
+
+ // It looks like a wild card subscription on a queue..
+ if( PathFilter.containsWildCards(destination.getName()) ){
+ return new WildcardQueueSubscription(this, destination, consumer);
}
- return sub;
+
+ // It has to be a Queue subscription then..
+ Queue queue = queues.get(destination.getName());
+ if( queue == null ) {
+ if( consumer.autoCreateDestination() ) {
+ queue = createQueue(destination);
+ } else {
+ throw new IllegalStateException("The queue does not exist: "+destination.getName());
+ }
+ }
+ return new Queue.QueueSubscription(queue);
}
public Broker getBroker() {
@@ -218,4 +238,30 @@
public void setBroker(Broker broker) {
this.broker = broker;
}
+
+ interface QueueLifecyleListener {
+
+ /**
+ * A destination has bean created
+ * @param destination
+ */
+ public void onCreate(Queue queue);
+
+ /**
+ * A destination has bean destroyed
+ * @param destination
+ */
+ public void onDestroy(Queue queue);
+
+ }
+
+ ArrayList<QueueLifecyleListener> queueLifecyleListeners = new ArrayList<QueueLifecyleListener>();
+
+ synchronized public void addDestinationLifecyleListener(QueueLifecyleListener listener) {
+ queueLifecyleListeners.add(listener);
+ }
+
+ synchronized public void removeDestinationLifecyleListener(QueueLifecyleListener listener) {
+ queueLifecyleListeners.add(listener);
+ }
}
Copied: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java (from r787446, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java?p2=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java&p1=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java&r1=787446&r2=787540&rev=787540&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java Tue Jun 23 04:41:30 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.legacy.test3;
+package org.apache.activemq.apollo.test3;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -25,7 +25,6 @@
import javax.jms.Session;
import javax.jms.TextMessage;
-import org.apache.activemq.apollo.test3.JmsTopicSendReceiveTest;
import org.apache.activemq.command.ActiveMQDestination;
/**