You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/19 17:52:59 UTC

svn commit: r786561 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/ activemq-queue/src/main/java/org/apache/activemq/queue/

Author: chirino
Date: Fri Jun 19 15:52:58 2009
New Revision: 786561

URL: http://svn.apache.org/viewvc?rev=786561&view=rev
Log:
Starting to make the TopicSubscription implementation more configurable.

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 
 public interface BrokerSubscription {
 
-    public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException ;
+    public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException ;
 
-    public void disconnect(Subscription<MessageDelivery> subscription);
+    public void disconnect(ConsumerContext subscription);
     
     public Destination getDestination();
     

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -56,7 +57,7 @@
         queue.add(message, source);
     }
 
-    public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+    public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
         if (this.connectedSub == null) {
             this.connectedSub = subscription;
             queue.addSubscription(connectedSub);
@@ -65,7 +66,7 @@
         }
     }
 
-    public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+    public synchronized void disconnect(final ConsumerContext subscription) {
         if (connectedSub != null && connectedSub == subscription) {
             queue.removeSubscription(connectedSub);
             connectedSub = null;

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -74,7 +75,7 @@
      * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
      * .broker.protocol.ProtocolHandler.ConsumerContext)
      */
-    public synchronized void connect(Subscription<MessageDelivery> subsription) throws UserAlreadyConnectedException {
+    public synchronized void connect(ConsumerContext subsription) throws UserAlreadyConnectedException {
         connectedSub = subsription;
         host.getRouter().bind(destination, this);
     }
@@ -86,7 +87,7 @@
      * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
      * .broker.protocol.ProtocolHandler.ConsumerContext)
      */
-    public synchronized void disconnect(Subscription<MessageDelivery> context) {
+    public synchronized void disconnect(ConsumerContext context) {
         host.getRouter().unbind(destination, this);
         connectedSub = null;
     }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java Fri Jun 19 15:52:58 2009
@@ -16,9 +16,7 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import org.apache.activemq.apollo.broker.DeliveryTarget;
-import org.apache.activemq.apollo.broker.Destination;
-import org.apache.activemq.apollo.broker.MessageDelivery;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.Subscription;
@@ -111,7 +109,7 @@
          * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
          * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
          */
-        public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+        public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
             this.subscription = subscription;
             queue.addSubscription(subscription);
         }
@@ -123,7 +121,7 @@
          * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
          * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
          */
-        public void disconnect(Subscription<MessageDelivery> context) {
+        public void disconnect(ConsumerContext context) {
             queue.removeSubscription(subscription);
         }
         

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,22 +16,30 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowLimiter;
 import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IFlowQueue;
 import org.apache.activemq.queue.Subscription;
 
 class TopicSubscription implements BrokerSubscription, DeliveryTarget {
 
+	static final boolean USE_PERSISTENT_QUEUES = true; 
+	
     protected final BooleanExpression selector;
     protected final Destination destination;
     protected Subscription<MessageDelivery> connectedSub;
     private final VirtualHost host;
     
     //TODO: replace this with a base interface for queue which also support non persistent use case.
-	private ExclusivePersistentQueue<Long, MessageDelivery> queue;
+	private IFlowQueue<MessageDelivery> queue;
 
     TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
         this.host = host;
@@ -61,12 +69,16 @@
         return selector != null;
     }
 
-    public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+    public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
         if (this.connectedSub == null) {
         	
         	// Ok this is not ideal.  Perhaps not all topic subscriptions want this level of service.
-            queue = host.getQueueStore().createExclusivePersistentQueue();
-            queue.start();
+        	if( USE_PERSISTENT_QUEUES ) {
+        		queue = createPersistentQueue(subscription);
+        	} else {
+        		queue = createNonPersistentQueue(subscription);
+        	}
+    		queue.start();
         	
         	this.connectedSub = subscription;
         	this.queue.addSubscription(connectedSub);
@@ -76,20 +88,43 @@
         }
     }
 
-    public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+    private IFlowQueue<MessageDelivery> createNonPersistentQueue(ConsumerContext subscription) {
+		Flow flow = new Flow(subscription.getResourceName(), false);
+		String name = subscription.getResourceName();
+		IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
+		ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
+		queue.setDispatcher(host.getBroker().getDispatcher());
+		return queue;
+	}
+
+	private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
+        ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
+        return queue;
+	}
+
+    @SuppressWarnings("unchecked")
+	private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
+    	ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
+		host.getQueueStore().deleteQueue(pq.getDescriptor());
+	}
+
+	public synchronized void disconnect(final ConsumerContext subscription) {
         if (connectedSub != null && connectedSub == subscription) {
     		this.host.getRouter().unbind(destination, this);
     		this.queue.removeSubscription(connectedSub);
     		this.connectedSub = null;
     		
     		queue.stop();
-    		host.getQueueStore().deleteQueue(queue.getDescriptor());
+        	if( USE_PERSISTENT_QUEUES ) {
+        		destroyPersistentQueue(queue);
+        	}
     		queue=null;
         }
     }
 
 
-    public boolean matches(MessageDelivery message) {
+
+	public boolean matches(MessageDelivery message) {
         if (selector == null) {
             return true;
         }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Fri Jun 19 15:52:58 2009
@@ -92,7 +92,7 @@
         // The 2nd connection should get the messages.
         for (int i = 0; i < 4; i++) {
             Message m1 = receiveMessage(connection2);
-            assertNotNull(m1);
+            assertNotNull("Message: "+i, m1);
         }
 
         // Send a message with the 2nd connection

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java Fri Jun 19 15:52:58 2009
@@ -70,4 +70,8 @@
      *            The base priority for the queue
      */
     public void setDispatchPriority(int priority);
+    
+    public void start();
+
+    public void stop();    
 }