You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/19 17:52:59 UTC
svn commit: r786561 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/
activemq-queue/src/main/java/org/apache/activemq/queue/
Author: chirino
Date: Fri Jun 19 15:52:58 2009
New Revision: 786561
URL: http://svn.apache.org/viewvc?rev=786561&view=rev
Log:
Starting to make the TopicSubscription implementation more configurable.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,13 +16,13 @@
*/
package org.apache.activemq.apollo.broker;
-import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
public interface BrokerSubscription {
- public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException ;
+ public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException ;
- public void disconnect(Subscription<MessageDelivery> subscription);
+ public void disconnect(ConsumerContext subscription);
public Destination getDestination();
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.apollo.broker;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
@@ -56,7 +57,7 @@
queue.add(message, source);
}
- public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+ public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
if (this.connectedSub == null) {
this.connectedSub = subscription;
queue.addSubscription(connectedSub);
@@ -65,7 +66,7 @@
}
}
- public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+ public synchronized void disconnect(final ConsumerContext subscription) {
if (connectedSub != null && connectedSub == subscription) {
queue.removeSubscription(connectedSub);
connectedSub = null;
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.apollo.broker;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
@@ -74,7 +75,7 @@
* org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
* .broker.protocol.ProtocolHandler.ConsumerContext)
*/
- public synchronized void connect(Subscription<MessageDelivery> subsription) throws UserAlreadyConnectedException {
+ public synchronized void connect(ConsumerContext subsription) throws UserAlreadyConnectedException {
connectedSub = subsription;
host.getRouter().bind(destination, this);
}
@@ -86,7 +87,7 @@
* org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
* .broker.protocol.ProtocolHandler.ConsumerContext)
*/
- public synchronized void disconnect(Subscription<MessageDelivery> context) {
+ public synchronized void disconnect(ConsumerContext context) {
host.getRouter().unbind(destination, this);
connectedSub = null;
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java Fri Jun 19 15:52:58 2009
@@ -16,9 +16,7 @@
*/
package org.apache.activemq.apollo.broker;
-import org.apache.activemq.apollo.broker.DeliveryTarget;
-import org.apache.activemq.apollo.broker.Destination;
-import org.apache.activemq.apollo.broker.MessageDelivery;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.Subscription;
@@ -111,7 +109,7 @@
* org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
* activemq.broker.protocol.ProtocolHandler.ConsumerContext)
*/
- public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+ public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
this.subscription = subscription;
queue.addSubscription(subscription);
}
@@ -123,7 +121,7 @@
* org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
* .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
*/
- public void disconnect(Subscription<MessageDelivery> context) {
+ public void disconnect(ConsumerContext context) {
queue.removeSubscription(subscription);
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Fri Jun 19 15:52:58 2009
@@ -16,22 +16,30 @@
*/
package org.apache.activemq.apollo.broker;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowLimiter;
import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IFlowQueue;
import org.apache.activemq.queue.Subscription;
class TopicSubscription implements BrokerSubscription, DeliveryTarget {
+ static final boolean USE_PERSISTENT_QUEUES = true;
+
protected final BooleanExpression selector;
protected final Destination destination;
protected Subscription<MessageDelivery> connectedSub;
private final VirtualHost host;
//TODO: replace this with a base interface for queue which also support non persistent use case.
- private ExclusivePersistentQueue<Long, MessageDelivery> queue;
+ private IFlowQueue<MessageDelivery> queue;
TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
this.host = host;
@@ -61,12 +69,16 @@
return selector != null;
}
- public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+ public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
if (this.connectedSub == null) {
// Ok this is not ideal. Perhaps not all topic subscriptions want this level of service.
- queue = host.getQueueStore().createExclusivePersistentQueue();
- queue.start();
+ if( USE_PERSISTENT_QUEUES ) {
+ queue = createPersistentQueue(subscription);
+ } else {
+ queue = createNonPersistentQueue(subscription);
+ }
+ queue.start();
this.connectedSub = subscription;
this.queue.addSubscription(connectedSub);
@@ -76,20 +88,43 @@
}
}
- public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+ private IFlowQueue<MessageDelivery> createNonPersistentQueue(ConsumerContext subscription) {
+ Flow flow = new Flow(subscription.getResourceName(), false);
+ String name = subscription.getResourceName();
+ IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
+ ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
+ queue.setDispatcher(host.getBroker().getDispatcher());
+ return queue;
+ }
+
+ private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
+ ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
+ return queue;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
+ ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
+ host.getQueueStore().deleteQueue(pq.getDescriptor());
+ }
+
+ public synchronized void disconnect(final ConsumerContext subscription) {
if (connectedSub != null && connectedSub == subscription) {
this.host.getRouter().unbind(destination, this);
this.queue.removeSubscription(connectedSub);
this.connectedSub = null;
queue.stop();
- host.getQueueStore().deleteQueue(queue.getDescriptor());
+ if( USE_PERSISTENT_QUEUES ) {
+ destroyPersistentQueue(queue);
+ }
queue=null;
}
}
- public boolean matches(MessageDelivery message) {
+
+ public boolean matches(MessageDelivery message) {
if (selector == null) {
return true;
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Fri Jun 19 15:52:58 2009
@@ -92,7 +92,7 @@
// The 2nd connection should get the messages.
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection2);
- assertNotNull(m1);
+ assertNotNull("Message: "+i, m1);
}
// Send a message with the 2nd connection
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java Fri Jun 19 15:52:58 2009
@@ -70,4 +70,8 @@
* The base priority for the queue
*/
public void setDispatchPriority(int priority);
+
+ public void start();
+
+ public void stop();
}