You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/19 06:02:39 UTC
svn commit: r786364 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/
activemq-openwire/src/test/java/org/apache/activemq/legacy/openwire...
Author: cmacnaug
Date: Fri Jun 19 04:02:38 2009
New Revision: 786364
URL: http://svn.apache.org/viewvc?rev=786364&view=rev
Log:
Adding support for exclusive consumers on shared queues.
Also:
-Refactored SharedPriorityQueue to extend partitioned queue (eventually we'll
want partitioned queues to assist in exclusivity ... for now each partion is exlusive but it isn't enforced across partitions)
-Fixed legacy openwire exclusive consumer test so that it correctly tests exclusivity.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Jun 19 04:02:38 2009
@@ -53,8 +53,8 @@
protected final boolean USE_KAHA_DB = true;
protected final boolean PURGE_STORE = true;
- protected final boolean PERSISTENT = true;
- protected final boolean DURABLE = true;
+ protected final boolean PERSISTENT = false;
+ protected final boolean DURABLE = false;
// Set to put senders and consumers on separate brokers.
protected boolean multibroker = false;
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Jun 19 04:02:38 2009
@@ -107,7 +107,7 @@
public MessageRecord createMessageRecord() {
MessageRecord record = new MessageRecord();
record.setEncoding(ENCODING);
-
+
ByteSequence bytes;
try {
bytes = storeWireFormat.marshal(message);
@@ -130,7 +130,9 @@
this.storeWireFormat = wireFormat;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.activemq.broker.MessageDelivery#getTTE()
*/
public long getExpiration() {
@@ -140,4 +142,8 @@
public MessageEvaluationContext createMessageEvaluationContext() {
return new OpenwireMessageEvaluationContext(message);
}
+
+ public String toString() {
+ return message.getMessageId().toString();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 04:02:38 2009
@@ -571,6 +571,10 @@
return isQueueReceiver;
}
+ public boolean isExclusive() {
+ return info.isExclusive();
+ }
+
/*
* (non-Javadoc)
*
@@ -704,6 +708,10 @@
return true;
}
+ public String toString() {
+ return info.getConsumerId().toString();
+ }
+
}
private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Fri Jun 19 04:02:38 2009
@@ -786,9 +786,7 @@
connection2.request(consumerInfo2);
// Second message should go to consumer 1 even though consumer 2 is
- // ready
- // for dispatch.
- connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ // ready for dispatch.
connection1.send(createMessage(producerInfo, destination, deliveryMode));
// Acknowledge the first 2 messages
@@ -803,6 +801,7 @@
// The last two messages should now go the the second consumer.
connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
for (int i = 0; i < 2; i++) {
Message m1 = receiveMessage(connection2);
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Jun 19 04:02:38 2009
@@ -445,6 +445,10 @@
return name + " on " + sourceQueue.getResourceName();
}
+ public boolean isExclusive() {
+ return false;
+ }
+
/*
* (non-Javadoc)
*
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri Jun 19 04:02:38 2009
@@ -21,23 +21,24 @@
import java.util.HashSet;
import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.util.Mapper;
abstract public class PartitionedQueue<K, V> extends AbstractFlowQueue<V> implements IPartitionedQueue<K, V> {
- private HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
+ protected HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
private HashMap<Integer, IQueue<K, V>> partitions = new HashMap<Integer, IQueue<K, V>>();
- protected Mapper<Integer, V> partitionMapper;
- private QueueStore<K, V> store;
+ protected QueueStore<K, V> store;
protected IDispatcher dispatcher;
- private boolean started;
- private boolean shutdown = false;
+ protected boolean started;
+ protected boolean shutdown = false;
protected QueueDescriptor queueDescriptor;
- private int basePriority = 0;
+ protected PersistencePolicy<V> persistencePolicy;
+ protected Mapper<Long, V> expirationMapper;
+ protected Mapper<K, V> keyMapper;
+ protected Mapper<Integer, V> partitionMapper;
+ protected int basePriority = 0;
public PartitionedQueue(String name) {
super(name);
@@ -50,7 +51,7 @@
return queueDescriptor;
}
- public IQueue<K, V> getPartition(int partitionKey) {
+ protected IQueue<K, V> getPartition(int partitionKey) {
boolean save = false;
IQueue<K, V> rc = null;
checkShutdown();
@@ -70,7 +71,10 @@
return rc;
}
-
+
+
+ abstract public IQueue<K, V> createPartition(int partitionKey);
+
/*
* (non-Javadoc)
*
@@ -81,20 +85,20 @@
if (basePriority != priority) {
basePriority = priority;
if (!shutdown) {
- for (IQueue<K, V> queue : partitions.values()) {
+ for (IQueue<K, V> queue : getPartitions()) {
queue.setDispatchPriority(basePriority);
}
}
}
}
}
-
+
public int getEnqueuedCount() {
checkShutdown();
- synchronized (partitions) {
+ synchronized (this) {
int count = 0;
- for (IQueue<K, V> queue : partitions.values()) {
+ for (IQueue<K, V> queue : getPartitions()) {
count += queue.getEnqueuedCount();
}
return count;
@@ -103,9 +107,9 @@
public synchronized long getEnqueuedSize() {
checkShutdown();
- synchronized (partitions) {
+ synchronized (this) {
long size = 0;
- for (IQueue<K, V> queue : partitions.values()) {
+ for (IQueue<K, V> queue : getPartitions()) {
if (queue != null) {
size += queue.getEnqueuedSize();
}
@@ -114,43 +118,44 @@
}
}
+
public void setStore(QueueStore<K, V> store) {
this.store = store;
}
public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
- // No-Op for now.
+ this.persistencePolicy = persistencePolicy;
}
public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
- // No-Op for now.
- }
-
- abstract public IQueue<K, V> createPartition(int partitionKey);
-
- public void addPartition(int partitionKey, IQueue<K, V> queue) {
- checkShutdown();
- synchronized (partitions) {
- partitions.put(partitionKey, queue);
- for (Subscription<V> sub : subscriptions) {
- queue.addSubscription(sub);
- queue.setDispatchPriority(basePriority);
- }
- }
+ this.expirationMapper = expirationMapper;
}
public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+ checkShutdown();
// No-op, only partitions should have stored values.
if (count > 0 || size > 0) {
throw new IllegalArgumentException("Partioned queues do not themselves hold values");
}
+ if (expirationMapper == null) {
+ expirationMapper = new Mapper<Long, V>() {
+
+ public Long map(V element) {
+ return -1L;
+ }
+ };
+ }
+ if (persistencePolicy == null) {
+ persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
+ }
}
+
public synchronized void start() {
if (!started) {
checkShutdown();
started = true;
- for (IQueue<K, V> partition : partitions.values()) {
+ for (IQueue<K, V> partition : getPartitions()) {
if (partition != null)
partition.start();
}
@@ -160,26 +165,27 @@
public synchronized void stop() {
if (started) {
started = false;
- for (IQueue<K, V> partition : partitions.values()) {
+ for (IQueue<K, V> partition : getPartitions()) {
if (partition != null)
partition.stop();
}
}
}
+
public void shutdown(boolean sync) {
- HashMap<Integer, IQueue<K, V>> partitions = null;
+ Collection <IQueue<K, V>> partitions = null;
synchronized (this) {
if (!shutdown) {
shutdown = true;
started = false;
}
- partitions = this.partitions;
+ partitions = getPartitions();
this.partitions = null;
}
if (partitions != null) {
- for (IQueue<K, V> partition : partitions.values()) {
+ for (IQueue<K, V> partition : partitions) {
if (partition != null)
partition.shutdown(sync);
}
@@ -188,9 +194,9 @@
public void addSubscription(Subscription<V> sub) {
checkShutdown();
- synchronized (partitions) {
+ synchronized (this) {
subscriptions.add(sub);
- Collection<IQueue<K, V>> values = partitions.values();
+ Collection<IQueue<K, V>> values = getPartitions();
for (IQueue<K, V> queue : values) {
queue.addSubscription(sub);
}
@@ -199,9 +205,9 @@
public boolean removeSubscription(Subscription<V> sub) {
checkShutdown();
- synchronized (partitions) {
+ synchronized (this) {
if (subscriptions.remove(sub)) {
- Collection<IQueue<K, V>> values = partitions.values();
+ Collection<IQueue<K, V>> values = getPartitions();
for (IQueue<K, V> queue : values) {
queue.removeSubscription(sub);
}
@@ -219,30 +225,42 @@
return partitionMapper;
}
+
public void add(V value, ISourceController<?> source) {
int partitionKey = partitionMapper.map(value);
- IQueue<K, V> partition = getPartition(partitionKey);
- partition.add(value, source);
+ getPartition(partitionKey).add(value, source);
}
public boolean offer(V value, ISourceController<?> source) {
int partitionKey = partitionMapper.map(value);
- IQueue<K, V> partition = getPartition(partitionKey);
- return partition.offer(value, source);
+ return getPartition(partitionKey).offer(value, source);
+ }
+
+ public void setKeyMapper(Mapper<K, V> keyMapper) {
+ this.keyMapper = keyMapper;
+ }
+
+ public void setAutoRelease(boolean autoRelease) {
+ this.autoRelease = autoRelease;
}
public void setDispatcher(IDispatcher dispatcher) {
checkShutdown();
this.dispatcher = dispatcher;
- synchronized (partitions) {
- Collection<IQueue<K, V>> values = partitions.values();
+ synchronized (this) {
+ Collection<IQueue<K, V>> values = getPartitions();
for (IQueue<K, V> queue : values) {
queue.setDispatcher(dispatcher);
}
}
}
+
+ protected Collection<IQueue<K, V>> getPartitions()
+ {
+ return partitions.values();
+ }
- private void checkShutdown() {
+ protected void checkShutdown() {
if (shutdown) {
throw new IllegalStateException(this + " is shutdown");
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Fri Jun 19 04:02:38 2009
@@ -17,32 +17,13 @@
package org.apache.activemq.queue;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
-import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PrioritySizeLimiter;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.util.Mapper;
-public class SharedPriorityQueue<K, V> extends AbstractFlowQueue<V> implements IPartitionedQueue<K, V> {
+public class SharedPriorityQueue<K, V> extends PartitionedQueue<K, V> {
- private final HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
- private final Mapper<Integer, V> priorityMapper;
private final ArrayList<SharedQueue<K, V>> partitions = new ArrayList<SharedQueue<K, V>>();
- private Mapper<K, V> keyMapper;
- private boolean autoRelease;
- private IDispatcher dispatcher;
private final PrioritySizeLimiter<V> limiter;
- private QueueStore<K, V> store;
- private PersistencePolicy<V> persistencePolicy;
- private boolean started;
- private QueueDescriptor queueDescriptor;
- private Mapper<Long, V> expirationMapper;
- private int basePriority = 0;
- private boolean shutdown = false;
public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
super(name);
@@ -50,154 +31,36 @@
queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
queueDescriptor.setQueueType(QueueDescriptor.SHARED_PRIORITY);
this.limiter = limiter;
- priorityMapper = limiter.getPriorityMapper();
+ super.setPartitionMapper(limiter.getPriorityMapper());
for (int i = 0; i < limiter.getPriorities(); i++) {
partitions.add(null);
}
}
- public synchronized void start() {
- if (!started) {
- checkShutdown();
- started = true;
- for (SharedQueue<K, V> partition : partitions) {
- if (partition != null)
- partition.start();
- }
- }
- }
-
- public synchronized void stop() {
- if (started) {
- started = false;
- for (SharedQueue<K, V> partition : partitions) {
- if (partition != null)
- partition.stop();
- }
- }
- }
-
+ @Override
public void shutdown(boolean sync) {
- ArrayList<SharedQueue<K, V>> partitions = null;
- synchronized (this) {
- if (!shutdown) {
- shutdown = true;
- started = false;
- }
- partitions = this.partitions;
- }
-
- if (partitions != null) {
- for (IQueue<K, V> partition : partitions) {
- if (partition != null)
- partition.shutdown(sync);
- }
- }
- }
-
- public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
- checkShutdown();
- // No-op, only partitions should have stored values.
- if (count > 0 || size > 0) {
- throw new IllegalArgumentException("Partioned queues do not themselves hold values");
- }
- if (expirationMapper == null) {
- expirationMapper = new Mapper<Long, V>() {
-
- public Long map(V element) {
- return -1L;
- }
-
- };
- }
- if (persistencePolicy == null) {
- persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
- }
- }
-
- public synchronized int getEnqueuedCount() {
- checkShutdown();
- int count = 0;
- for (SharedQueue<K, V> queue : partitions) {
- if (queue != null) {
- count += queue.getEnqueuedCount();
- }
+ try {
+ super.shutdown(sync);
+ } finally {
+ partitions.clear();
}
- return count;
}
+ /**
+ * Override with more efficient limiter lookup:
+ */
+ @Override
public synchronized long getEnqueuedSize() {
return limiter.getSize();
}
- public void setStore(QueueStore<K, V> store) {
- this.store = store;
- }
-
- public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
- this.persistencePolicy = persistencePolicy;
- }
-
- public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
- this.expirationMapper = expirationMapper;
- }
-
-
- @Override
- public void addSubscription(Subscription<V> sub) {
- synchronized (this) {
- checkShutdown();
- subscriptions.add(sub);
- for (SharedQueue<K, V> queue : partitions) {
- if (queue != null) {
- queue.addSubscription(sub);
- }
- }
- }
- }
-
- @Override
- public boolean removeSubscription(Subscription<V> sub) {
- synchronized (this) {
- if (subscriptions.remove(sub)) {
- for (SharedQueue<K, V> queue : partitions) {
- if (queue != null) {
- queue.removeSubscription(sub);
- }
- }
- return true;
- }
- }
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.queue.IQueue#setDispatchPriority(int)
- */
- public void setDispatchPriority(int priority) {
+ public IQueue<K, V> createPartition(int prio) {
synchronized (this) {
- if (basePriority != priority) {
- basePriority = priority;
- if (shutdown) {
- return;
- }
- for (int i = 0; i < limiter.getPriorities(); i++) {
- SharedQueue<K, V> queue = partitions.get(i);
- if (queue != null) {
- queue.setDispatchPriority(basePriority + i);
- }
- }
- }
+ return getPartition(prio, started);
}
}
- public IQueue<K, V> createPartition(int prio) {
- return getPartition(prio, false);
- }
-
- private IQueue<K, V> getPartition(int prio, boolean initialize) {
+ protected IQueue<K, V> getPartition(int prio, boolean initialize) {
synchronized (this) {
checkShutdown();
SharedQueue<K, V> queue = partitions.get(prio);
@@ -231,71 +94,4 @@
return queue;
}
}
-
- public QueueDescriptor getDescriptor() {
- return queueDescriptor;
- }
-
- public void add(V value, ISourceController<?> source) {
- int prio = priorityMapper.map(value);
- getPartition(prio, true).add(value, source);
- }
-
- public boolean offer(V value, ISourceController<?> source) {
- int prio = priorityMapper.map(value);
- return getPartition(prio, true).offer(value, source);
- }
-
- public void setKeyMapper(Mapper<K, V> keyMapper) {
- this.keyMapper = keyMapper;
- }
-
- public void setAutoRelease(boolean autoRelease) {
- this.autoRelease = autoRelease;
- }
-
- public void setDispatcher(IDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
- }
-
- private void checkShutdown() {
- if (shutdown) {
- throw new IllegalStateException(this + " is shutdown");
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.queue.IPollableFlowSource#isDispatchReady()
- */
- public boolean isDispatchReady() {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException();
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.queue.IPollableFlowSource#poll()
- */
- public V poll() {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException();
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.queue.IPollableFlowSource#pollingDispatch()
- */
- public boolean pollingDispatch() {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException();
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.flow.ISinkController.FlowControllable#flowElemAccepted(org.apache.activemq.flow.ISourceController, java.lang.Object)
- */
- public void flowElemAccepted(ISourceController<V> source, V elem) {
- // TODO Remove
- throw new UnsupportedOperationException();
-
- }
-
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Fri Jun 19 04:02:38 2009
@@ -67,6 +67,9 @@
private QueueStore<K, V> store;
private PersistencePolicy<V> persistencePolicy;
+ private SubscriptionContext exclusiveConsumer = null;
+ private int exclusiveConsumerCount = 0;
+
// Open consumers:
private final HashMap<Subscription<V>, SubscriptionContext> consumers = new HashMap<Subscription<V>, SubscriptionContext>();
private int startedConsumers = 0;
@@ -172,8 +175,15 @@
SubscriptionContext context = new SubscriptionContext(subscription);
SubscriptionContext old = consumers.put(subscription, context);
if (old != null) {
+ context.close();
consumers.put(subscription, old);
} else {
+ if (exclusiveConsumer == null) {
+ if (context.isExclusive()) {
+ exclusiveConsumer = context;
+ }
+ }
+
context.start();
}
}
@@ -184,6 +194,29 @@
SubscriptionContext old = consumers.remove(subscription);
if (old != null) {
old.close();
+
+ //Was this the exclusive consumer?
+ if (old == exclusiveConsumer) {
+ if (exclusiveConsumerCount > 0) {
+ for (SubscriptionContext context : consumers.values()) {
+ if (context.isExclusive()) {
+ exclusiveConsumer = context;
+ //Update the dispatch list:
+ context.updateDispatchList();
+ break;
+ }
+ }
+ } else {
+ //Otherwise add the remaining subs to appropriate dispatch
+ //lists:
+ exclusiveConsumer = null;
+ for (SubscriptionContext context : consumers.values()) {
+ if (!context.sub.isBrowser()) {
+ context.updateDispatchList();
+ }
+ }
+ }
+ }
return true;
}
return false;
@@ -397,6 +430,9 @@
SubscriptionContext nextConsumer = consumer.getNext();
switch (consumer.offer(next)) {
case ACCEPTED:
+ if (DEBUG)
+ System.out.println("Dispatched " + next.getElement() + " to " + consumer);
+
// Rotate list so this one is last next time:
sharedConsumers.rotate();
interested = true;
@@ -469,6 +505,9 @@
SubscriptionContext(Subscription<V> target) {
this.sub = target;
this.cursor = openCursor(target.toString(), true, !sub.isBrowser());
+ if (isExclusive()) {
+ exclusiveConsumerCount++;
+ }
cursor.setCursorReadyListener(new CursorReadyListener() {
public void onElementReady() {
if (!isLinked()) {
@@ -478,10 +517,15 @@
});
}
+ public boolean isExclusive() {
+ return sub.isExclusive() && !sub.isBrowser();
+ }
+
public void start() {
if (!isStarted) {
isStarted = true;
if (!sub.isBrowser()) {
+
if (sub.hasSelector()) {
activeSelectorSubs++;
}
@@ -504,6 +548,7 @@
// If started remove this from any dispatch list
if (isStarted) {
if (!sub.isBrowser()) {
+
if (sub.hasSelector()) {
activeSelectorSubs--;
}
@@ -517,6 +562,10 @@
}
public void close() {
+ if (isExclusive()) {
+ exclusiveConsumerCount--;
+ }
+
stop();
}
@@ -550,13 +599,22 @@
return false;
}
- // TODO Even if there are subscriptions with selectors present
- // we can still join the shared cursor as long as there is at
- // least one ready selector-less sub.
boolean join = false;
- if (activeSelectorSubs == 0) {
+ //If we are the exlusive consumer then we join the shared
+ //cursor:
+ if (exclusiveConsumer == this) {
+ join = true;
+ }
+ //Otherwise if we aren't we won't be joining anything!
+ else if (exclusiveConsumer != null) {
+ return false;
+ } else if (activeSelectorSubs == 0) {
join = true;
} else {
+
+ // TODO Even if there are subscriptions with selectors present
+ // we can still join the shared cursor as long as there is at
+ // least one ready selector-less sub.
cursor.getNext();
if (queue.isEmpty() || cursor.compareTo(sharedCursor) >= 0) {
join = true;
@@ -578,9 +636,14 @@
/**
* Adds to subscription to the appropriate dispatch list:
*/
- private final void updateDispatchList() {
+ final void updateDispatchList() {
if (!checkJoinShared()) {
+ //Otherwise if we're not the exclusive consumer
+ if (!sub.isBrowser() && exclusiveConsumer != null) {
+ return;
+ }
+
// Make sure our cursor is activated:
cursor.activate();
// If our next element is paged out
@@ -641,9 +704,7 @@
if (callback == null) {
qe.acknowledge();
}
- }
- else
- {
+ } else {
qe.setAcquired(null);
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java Fri Jun 19 04:02:38 2009
@@ -57,6 +57,15 @@
* @return True if this is a subscription browser.
*/
public boolean isBrowser();
+
+ /**
+ * Indicates that the subscription is exclusive. When there at least one
+ * exclusive subscription on a shared queue, the queue will dispatch to
+ * only one such consumer while there is at least one connected.
+ *
+ * @return True if the Subscription is exclusive.
+ */
+ public boolean isExclusive();
/**
* Returns true if the Subscription has a selector. If true
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java Fri Jun 19 04:02:38 2009
@@ -36,7 +36,7 @@
private final MockStoreAdapater store = new MockStoreAdapater();
private static final PersistencePolicy<Message> NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY<Message>();
private static final boolean USE_OLD_QUEUE = false;
-
+
private IQueue<Long, Message> createQueue() {
if (partitionMapper != null) {
@@ -117,6 +117,10 @@
return true;
}
+ public boolean isExclusive() {
+ return false;
+ }
+
public IFlowSink<Message> getSink() {
return dt.getSink();
}
@@ -133,7 +137,7 @@
public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
return getSink().offer(elem, controller);
}
-
+
public void add(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
getSink().add(elem, controller);
}
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Fri Jun 19 04:02:38 2009
@@ -489,6 +489,13 @@
public boolean autoCreateDestination() {
return true;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.Subscription#isExclusive()
+ */
+ public boolean isExclusive() {
+ return false;
+ }
}
private void sendError(String message) {