You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 23:10:38 UTC

svn commit: r786283 - in /activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker: BrokerQueueStore.java TopicSubscription.java

Author: chirino
Date: Thu Jun 18 21:10:38 2009
New Revision: 786283

URL: http://svn.apache.org/viewvc?rev=786283&view=rev
Log:
Fixing testConsumerPrefetchAndStandardAck:
 Flow cotrol was not really being applied in the Topic case.  Changed it so that topic subs are also backed by queues.  

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=786283&r1=786282&r2=786283&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Thu Jun 18 21:10:38 2009
@@ -21,6 +21,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.dispatch.IDispatcher;
@@ -78,6 +79,8 @@
     // Be default we don't page out elements to disk.
     private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
     //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
+    
+    private static long dynamicQueueCounter = 0;
 
     private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
 
@@ -280,6 +283,21 @@
         return queue;
     }
 
+    
+    
+    public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
+        ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+        synchronized (this) {
+            String name = "temp:"+(dynamicQueueCounter++);
+            queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+            queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+            queue.initialize(0, 0, 0, 0);
+            addQueue(queue.getDescriptor());
+        }
+        return queue;
+    }
+    
+    
     public Collection<ExclusivePersistentQueue<Long, MessageDelivery>> getDurableQueues() {
         synchronized (this) {
             Collection<ExclusivePersistentQueue<Long, MessageDelivery>> c = durableQueues.values();

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=786283&r1=786282&r2=786283&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Thu Jun 18 21:10:38 2009
@@ -20,6 +20,7 @@
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.Subscription;
 
 class TopicSubscription implements BrokerSubscription, DeliveryTarget {
@@ -28,6 +29,9 @@
     protected final Destination destination;
     protected Subscription<MessageDelivery> connectedSub;
     private final VirtualHost host;
+    
+    //TODO: replace this with a base interface for queue which also support non persistent use case.
+	private ExclusivePersistentQueue<Long, MessageDelivery> queue;
 
     TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
         this.host = host;
@@ -43,9 +47,8 @@
      * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
      */
     public final void deliver(MessageDelivery message, ISourceController<?> source) {
-        Subscription<MessageDelivery> s = connectedSub;
-        if (s != null && matches(message)) {
-            s.add(message, source, null);
+        if (matches(message)) {
+            queue.add(message, source);
         }
     }
 
@@ -58,30 +61,34 @@
         return selector != null;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void connect(Subscription<MessageDelivery> subsription) throws UserAlreadyConnectedException {
-        connectedSub = subsription;
-        host.getRouter().bind(destination, this);
+    public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+        if (this.connectedSub == null) {
+        	
+        	// Ok this is not ideal.  Perhaps not all topic subscriptions want this level of service.
+            queue = host.getQueueStore().createExclusivePersistentQueue();
+            queue.start();
+        	
+        	this.connectedSub = subscription;
+        	this.queue.addSubscription(connectedSub);
+    		this.host.getRouter().bind(destination, this);
+        } else if (connectedSub != subscription) {
+            throw new UserAlreadyConnectedException();
+        }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void disconnect(Subscription<MessageDelivery> context) {
-        host.getRouter().unbind(destination, this);
-        connectedSub = null;
+    public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+        if (connectedSub != null && connectedSub == subscription) {
+    		this.host.getRouter().unbind(destination, this);
+    		this.queue.removeSubscription(connectedSub);
+    		this.connectedSub = null;
+    		
+    		queue.stop();
+    		host.getQueueStore().deleteQueue(queue.getDescriptor());
+    		queue=null;
+        }
     }
 
+
     public boolean matches(MessageDelivery message) {
         if (selector == null) {
             return true;