You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 23:10:38 UTC
svn commit: r786283 - in
/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker:
BrokerQueueStore.java TopicSubscription.java
Author: chirino
Date: Thu Jun 18 21:10:38 2009
New Revision: 786283
URL: http://svn.apache.org/viewvc?rev=786283&view=rev
Log:
Fixing testConsumerPrefetchAndStandardAck:
Flow cotrol was not really being applied in the Topic case. Changed it so that topic subs are also backed by queues.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=786283&r1=786282&r2=786283&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Thu Jun 18 21:10:38 2009
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.dispatch.IDispatcher;
@@ -78,6 +79,8 @@
// Be default we don't page out elements to disk.
private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
//private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
+
+ private static long dynamicQueueCounter = 0;
private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
@@ -280,6 +283,21 @@
return queue;
}
+
+
+ public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
+ ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+ synchronized (this) {
+ String name = "temp:"+(dynamicQueueCounter++);
+ queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+ queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+ queue.initialize(0, 0, 0, 0);
+ addQueue(queue.getDescriptor());
+ }
+ return queue;
+ }
+
+
public Collection<ExclusivePersistentQueue<Long, MessageDelivery>> getDurableQueues() {
synchronized (this) {
Collection<ExclusivePersistentQueue<Long, MessageDelivery>> c = durableQueues.values();
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=786283&r1=786282&r2=786283&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Thu Jun 18 21:10:38 2009
@@ -20,6 +20,7 @@
import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
import org.apache.activemq.queue.Subscription;
class TopicSubscription implements BrokerSubscription, DeliveryTarget {
@@ -28,6 +29,9 @@
protected final Destination destination;
protected Subscription<MessageDelivery> connectedSub;
private final VirtualHost host;
+
+ //TODO: replace this with a base interface for queue which also support non persistent use case.
+ private ExclusivePersistentQueue<Long, MessageDelivery> queue;
TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
this.host = host;
@@ -43,9 +47,8 @@
* .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
*/
public final void deliver(MessageDelivery message, ISourceController<?> source) {
- Subscription<MessageDelivery> s = connectedSub;
- if (s != null && matches(message)) {
- s.add(message, source, null);
+ if (matches(message)) {
+ queue.add(message, source);
}
}
@@ -58,30 +61,34 @@
return selector != null;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
- * .broker.protocol.ProtocolHandler.ConsumerContext)
- */
- public synchronized void connect(Subscription<MessageDelivery> subsription) throws UserAlreadyConnectedException {
- connectedSub = subsription;
- host.getRouter().bind(destination, this);
+ public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+ if (this.connectedSub == null) {
+
+ // Ok this is not ideal. Perhaps not all topic subscriptions want this level of service.
+ queue = host.getQueueStore().createExclusivePersistentQueue();
+ queue.start();
+
+ this.connectedSub = subscription;
+ this.queue.addSubscription(connectedSub);
+ this.host.getRouter().bind(destination, this);
+ } else if (connectedSub != subscription) {
+ throw new UserAlreadyConnectedException();
+ }
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
- * .broker.protocol.ProtocolHandler.ConsumerContext)
- */
- public synchronized void disconnect(Subscription<MessageDelivery> context) {
- host.getRouter().unbind(destination, this);
- connectedSub = null;
+ public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+ if (connectedSub != null && connectedSub == subscription) {
+ this.host.getRouter().unbind(destination, this);
+ this.queue.removeSubscription(connectedSub);
+ this.connectedSub = null;
+
+ queue.stop();
+ host.getQueueStore().deleteQueue(queue.getDescriptor());
+ queue=null;
+ }
}
+
public boolean matches(MessageDelivery message) {
if (selector == null) {
return true;