You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/29 17:20:13 UTC
svn commit: r780014 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/activemq/...
Author: chirino
Date: Fri May 29 15:20:12 2009
New Revision: 780014
URL: http://svn.apache.org/viewvc?rev=780014&view=rev
Log:
appling patch at https://issues.apache.org/activemq/browse/AMQ-2271
Thanks colin.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri May 29 15:20:12 2009
@@ -35,8 +35,8 @@
}*/
@Override
- public void remove(long size) {
- super.remove(size);
+ public void remove(int count, long size) {
+ super.remove(count, size);
if (!clientMode) {
available += size;
if (available >= capacity - resumeThreshold) {
@@ -51,7 +51,7 @@
}
public void onProtocolCredit(int credit) {
- remove(credit);
+ remove(1, credit);
}
public int getElementSize(E m) {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Fri May 29 15:20:12 2009
@@ -71,7 +71,7 @@
private Mapper<Integer, MessageDelivery> partitionMapper;
- private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
+ private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
// Be default we don't page out elements to disk.
private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
@@ -115,7 +115,7 @@
private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
// Be default we don't page out elements to disk.
- private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+ private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
@@ -198,7 +198,7 @@
ExclusivePersistentQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-
+
}
}
@@ -253,10 +253,12 @@
}
public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
- Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
- ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
- ret.addAll(c);
- return ret;
+ synchronized (this) {
+ Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
+ ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+ ret.addAll(c);
+ return ret;
+ }
}
public ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueue(String name) {
@@ -275,6 +277,15 @@
return queue;
}
+ public Collection<ExclusivePersistentQueue<Long, MessageDelivery>> getDurableQueues() {
+ synchronized (this) {
+ Collection<ExclusivePersistentQueue<Long, MessageDelivery>> c = durableQueues.values();
+ ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>> ret = new ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>>(c.size());
+ ret.addAll(c);
+ return ret;
+ }
+ }
+
public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
IQueue<Long, MessageDelivery> queue = null;
@@ -294,7 +305,7 @@
private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
ExclusivePersistentQueue<Long, MessageDelivery> queue;
-
+
SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
@Override
public int getElementSize(MessageDelivery elem) {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Fri May 29 15:20:12 2009
@@ -32,15 +32,18 @@
Queue(IQueue<Long, MessageDelivery> queue) {
this.queue = queue;
}
-
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+ * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
*/
public void deliver(MessageDelivery message, ISourceController<?> source) {
queue.add(message, source);
}
-
+
public final void addSubscription(final Subscription<MessageDelivery> sub) {
queue.addSubscription(sub);
}
@@ -59,6 +62,12 @@
}
}
+ public void shutdown(boolean sync) throws Exception {
+ if (queue != null) {
+ queue.shutdown(sync);
+ }
+ }
+
public boolean hasSelector() {
return false;
}
@@ -95,16 +104,24 @@
this.queue = queue;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+ * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
*/
public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
this.subscription = subscription;
queue.addSubscription(subscription);
}
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+ * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
*/
public void disconnect(Subscription<MessageDelivery> context) {
queue.removeSubscription(subscription);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Fri May 29 15:20:12 2009
@@ -22,6 +22,7 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.AbstractFlowQueue;
import org.apache.activemq.queue.ExclusivePersistentQueue;
import org.apache.activemq.queue.IQueue;
@@ -95,8 +96,13 @@
return;
}
for (Queue queue : queues.values()) {
- queue.stop();
+ queue.shutdown(true);
}
+
+ for (AbstractFlowQueue<MessageDelivery> queue : queueStore.getDurableQueues()) {
+ queue.shutdown(true);
+ }
+
started = false;
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri May 29 15:20:12 2009
@@ -427,6 +427,7 @@
selector = parseSelector(info);
limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+ @Override
public int getElementSize(MessageDelivery m) {
return m.getFlowLimiterSize();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Fri May 29 15:20:12 2009
@@ -304,6 +304,7 @@
if (ackMode != StompSubscription.AUTO_ACK) {
Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
limiter = new WindowLimiter<MessageDelivery>(true, flow, connection.getOutputWindowSize(), connection.getOutputResumeThreshold()) {
+ @Override
public int getElementSize(MessageDelivery m) {
return m.getFlowLimiterSize();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Fri May 29 15:20:12 2009
@@ -371,7 +371,7 @@
}
synchronized (opQueue) {
- this.storeLimiter.remove(release);
+ this.storeLimiter.remove(1, release);
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri May 29 15:20:12 2009
@@ -185,7 +185,7 @@
Thread joinThread = null;
synchronized (this) {
if (thread != null) {
- dispatch(new RunnableAdapter() {
+ dispatchInternal(new RunnableAdapter() {
public void run() {
running = false;
}
@@ -215,8 +215,10 @@
public void run() {
- // Inform the dispatcher that we have started:
- pooledDispatcher.onDispatcherStarted((D) this);
+ if (pooledDispatcher != null) {
+ // Inform the dispatcher that we have started:
+ pooledDispatcher.onDispatcherStarted((D) this);
+ }
PriorityDispatchContext pdc;
try {
@@ -288,7 +290,9 @@
} catch (Throwable thrown) {
thrown.printStackTrace();
} finally {
- pooledDispatcher.onDispatcherStopped((D) this);
+ if (pooledDispatcher != null) {
+ pooledDispatcher.onDispatcherStopped((D) this);
+ }
cleanup();
}
}
@@ -368,6 +372,14 @@
return true;
}
+ //Special dispatch method that allow high priority dispatch:
+ private final void dispatchInternal(Dispatchable dispatchable, int priority)
+ {
+ PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+ context.priority = priority;
+ context.requestDispatch();
+ }
+
/*
* (non-Javadoc)
*
@@ -424,7 +436,14 @@
}
private final D getCurrentDispatcher() {
- return pooledDispatcher.getCurrentDispatcher();
+ if (pooledDispatcher != null) {
+ return pooledDispatcher.getCurrentDispatcher();
+ } else if (Thread.currentThread() == thread) {
+ return (D) this;
+ } else {
+ return null;
+ }
+
}
private final PooledDispatchContext<D> getCurrentDispatchContext() {
@@ -460,7 +479,7 @@
this.dispatchable = dispatchable;
this.name = name;
this.currentOwner = (D) PriorityDispatcher.this;
- if (persistent) {
+ if (persistent && pooledDispatcher != null) {
this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
} else {
this.tracker = null;
@@ -549,7 +568,9 @@
if (closed) {
return;
}
-
+
+ priority = Math.min(priority, MAX_USER_PRIORITY);
+
if (this.priority == priority) {
return;
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Fri May 29 15:20:12 2009
@@ -16,52 +16,107 @@
*/
package org.apache.activemq.dispatch;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
- private final boolean DEBUG = false;
+ private final boolean DEBUG = true;
+
+ //TODO: Added plumbing for periodic rebalancing which we should
+ //consider implementing
+ private static final boolean ENABLE_UPDATES = false;
+ private final ArrayList<D> dispatchers = new ArrayList<D>();
+
+ private AtomicBoolean running = new AtomicBoolean(false);
+ private boolean needsUpdate = false;
+ private static final int FREQUENCY = 5000;
+ private boolean scheduled = false;
+ private final Runnable timerCallback;
public SimpleLoadBalancer() {
- }
- @SuppressWarnings("hiding")
- private class ExecutionStats<D extends IDispatcher> {
- final PooledDispatchContext<D> target;
- final PooledDispatchContext<D> source;
- int count;
+ timerCallback = new Runnable() {
+ public final void run() {
+ if (running.get()) {
+ rebalance();
+ synchronized (dispatchers) {
+ scheduled = false;
+ scheduleNext();
+ }
+ }
+ }
+ };
- ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
- this.target = target;
- this.source = source;
- }
+ }
- public String toString() {
- return "Connection from: " + source + " to " + target;
+ private void rebalance() {
+ if (!needsUpdate) {
+ return;
}
+ // TODO Auto-generated method stub
}
- public void onDispatcherStarted(D dispatcher) {
-
+ public void start() {
+ if (running.compareAndSet(false, true)) {
+ scheduleNext();
+ }
}
- public void onDispatcherStopped(D dispatcher) {
+ private void scheduleNext() {
+ if (!ENABLE_UPDATES) {
+ return;
+ }
+ synchronized (dispatchers) {
+ if (!scheduled) {
+ if (!dispatchers.isEmpty()) {
+ dispatchers.get(0).schedule(timerCallback, FREQUENCY, TimeUnit.MILLISECONDS);
+ scheduled = true;
+ }
+ }
+ }
+ }
+ public void stop() {
+ running.compareAndSet(true, false);
}
- public void start() {
+ public synchronized final void onDispatcherStarted(D dispatcher) {
+ dispatchers.add(dispatcher);
+ scheduleNext();
}
- public void stop() {
+ /**
+ * A Dispatcher must call this when exiting it's dispatch loop
+ */
+ public void onDispatcherStopped(D dispatcher) {
+ dispatchers.remove(dispatcher);
}
-
+
public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
return new SimpleExecutionTracker(context);
}
+ private static class ExecutionStats<D extends IDispatcher> {
+ final PooledDispatchContext<D> target;
+ final PooledDispatchContext<D> source;
+ int count;
+
+ ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
+ this.target = target;
+ this.source = source;
+ }
+
+ public String toString() {
+ return "Connection from: " + source + " to " + target;
+ }
+ }
+
private class SimpleExecutionTracker implements ExecutionTracker<D> {
private final HashMap<PooledDispatchContext<D>, ExecutionStats<D>> sources = new HashMap<PooledDispatchContext<D>, ExecutionStats<D>>();
private final PooledDispatchContext<D> context;
@@ -102,6 +157,7 @@
if (singleSource == null && sources.isEmpty()) {
singleSource = callingContext;
ExecutionStats<D> stats = new ExecutionStats<D>(callingContext, context);
+ stats.count++;
sources.put(callingContext, stats);
// If this context only has a single source
@@ -121,6 +177,7 @@
stats = new ExecutionStats<D>(callingContext, context);
sources.put(callingContext, stats);
}
+ stats.count++;
if (singleSource != null) {
singleSource = null;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java Fri May 29 15:20:12 2009
@@ -24,6 +24,8 @@
public boolean add(int count, long size);
+ public void remove(int count, long size);
+
public long getCapacity();
public long getSize();
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java Fri May 29 15:20:12 2009
@@ -40,7 +40,7 @@
private class Priority extends AbstractLimiter<E> implements IFlowSizeLimiter<E> {
final int priority;
- int size;
+ long size;
int reserved;
private boolean throttled;
@@ -99,16 +99,16 @@
if (reserved > 0) {
int res = reserved;
reserved = 0;
- remove(res);
+ remove(1, res);
}
}
public void remove(E elem) {
int size = sizeMapper.map(elem);
- remove(size);
+ remove(1, size);
}
- protected void remove(int s) {
+ public void remove(int c, long s) {
size -= s;
totalSize -= s;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java Fri May 29 15:20:12 2009
@@ -44,7 +44,7 @@
}
public final void remove(E elem) {
- remove(getElementSize(elem));
+ remove(1, getElementSize(elem));
}
public void reserve(E elem) {
@@ -55,11 +55,11 @@
if (reserved > 0) {
long res = reserved;
reserved = 0;
- remove(res);
+ remove(1, res);
}
}
- public void remove(long s) {
+ public void remove(int count, long s) {
this.size -= s;
if (size < 0) {
Exception ie = new IllegalStateException("Size Negative!" + size);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Fri May 29 15:20:12 2009
@@ -45,6 +45,7 @@
thrown.printStackTrace();
}
};
+ protected boolean started;
AbstractFlowQueue() {
super();
@@ -80,6 +81,37 @@
return this;
}
+ public synchronized void start() {
+ if (!started) {
+ started = true;
+ if (isDispatchReady()) {
+ notifyReady();
+ }
+ }
+ }
+
+ public synchronized void stop() {
+ started = false;
+ }
+
+ /**
+ * Calls stop and cleans up resources associated with the queue.
+ * @param sync
+ */
+ public void shutdown(boolean sync) {
+ stop();
+ DispatchContext dc = null;
+ synchronized (this) {
+ dc = dispatchContext;
+ dispatchContext = null;
+
+ }
+
+ if (dc != null) {
+ dc.close(sync);
+ }
+ }
+
/**
* Sets an asynchronous dispatcher for this source. As elements become
* available they will be dispatched to the worker pool.
@@ -143,7 +175,7 @@
if (readyListeners == null) {
return;
}
-
+
if (!readyListeners.isEmpty()) {
for (FlowReadyListener<E> listener : readyListeners) {
listener.onFlowReady(this);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java Fri May 29 15:20:12 2009
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.queue;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -61,9 +61,9 @@
private final Expirator expirator;
private final QueueStore<?, V> queueStore;
private final ElementLoader loader;
- public final QueueDescriptor queueDescriptor;
+ private final QueueDescriptor queueDescriptor;
private final Object mutex;
-
+
public CursoredQueue(PersistencePolicy<V> persistencePolicy, Mapper<Long, V> expirationMapper, Flow flow, QueueDescriptor queueDescriptor, QueueStore<?, V> store, Object mutex) {
this.persistencePolicy = persistencePolicy;
this.mutex = mutex;
@@ -197,7 +197,8 @@
}
/**
- * @return The first sequence number in the queue (or the next sequence number if it is empty)
+ * @return The first sequence number in the queue (or the next sequence
+ * number if it is empty)
*/
public long getFirstSequence() {
if (queue.isEmpty()) {
@@ -278,8 +279,7 @@
private boolean paused;
- public Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements,
- IFlowController<QueueElement<V>> memoryController) {
+ public Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements, IFlowController<QueueElement<V>> memoryController) {
this.name = name;
this.queue = queue.queue;
this.loader = queue.loader;
@@ -336,13 +336,11 @@
return !paused && activated && pageInElements;
}
- public void close()
- {
+ public void close() {
deactivate();
cQueue.openCursors.remove(this);
}
-
-
+
public final void reset(long sequence) {
updateSequence(sequence);
updateCurrent(null);
@@ -695,8 +693,9 @@
}
/**
- * @param l Set the highest sequence number to which this
- * cursor can advance.
+ * @param l
+ * Set the highest sequence number to which this cursor can
+ * advance.
*/
public void setLimit(long l) {
limit = l;
@@ -710,7 +709,7 @@
final CursoredQueue<V> queue;
V elem;
- int size = -1;
+ private int size = -1;
long expiration = -1;
boolean redelivered = false;
@@ -730,8 +729,8 @@
// Indicates whether the element has been saved in the store.
boolean saved = false;
- boolean deleted = false;
- boolean acquired = false;
+ private boolean deleted = false;
+ private Subscription<V> owner = null;
public QueueElement(V elem, long sequence, CursoredQueue<V> queue) {
this.elem = elem;
@@ -764,6 +763,10 @@
return sequence;
}
+ public final int getLimiterSize() {
+ return size;
+ }
+
public final void addHardRef() {
hardRefs++;
// Page in the element (providing it wasn't removed):
@@ -799,8 +802,8 @@
assert softRefs >= 0;
}
- public final void setAcquired(boolean val) {
- this.acquired = val;
+ public final void setAcquired(Subscription<V> owner) {
+ this.owner = owner;
}
public final void acknowledge() {
@@ -825,7 +828,7 @@
}
public final void unacquire(ISourceController<?> source) {
- acquired = false;
+ owner = null;
if (isExpired()) {
acknowledge();
} else {
@@ -843,7 +846,7 @@
// Don't page out of there is a hard ref to the element
// or if it is acquired (since we need the element
// during delete:
- if (!deleted && (hardRefs > 0 || acquired)) {
+ if (!deleted && (hardRefs > 0 || isAcquired())) {
return;
}
@@ -854,7 +857,7 @@
if (!queue.getPersistencePolicy().isPersistent(elem)) {
save(controller, true);
if (DEBUG)
- System.out.println("Paged out element: " + this);
+ System.out.println("Paging out non-pers element: " + this);
}
// If save is pending don't unload until the save has
@@ -864,6 +867,8 @@
}
}
+ if (DEBUG)
+ System.out.println("Paged out element: " + this);
elem = null;
}
@@ -886,7 +891,7 @@
// Otherwise as long as the element isn't acquired we can unload
// it. If it is acquired we keep the soft ref arount to remember
// that it is.
- else if (!acquired && queue.getLoader().isPageOutPlaceHolders()) {
+ else if (!isAcquired() && queue.getLoader().isPageOutPlaceHolders()) {
loaded = false;
@@ -1008,7 +1013,11 @@
}
public final boolean isAcquired() {
- return acquired || deleted;
+ return owner != null;
+ }
+
+ public Subscription<V> getOwner() {
+ return owner;
}
public final long getExpiration() {
@@ -1099,7 +1108,7 @@
}
public String toString() {
- return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " aquired: " + acquired;
+ return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
}
}
@@ -1344,7 +1353,7 @@
if (!persistencePolicy.isPagingEnabled()) {
qe.addHardRef();
}
-
+
// Persist the element if required:
if (persistencePolicy.isPersistent(qe.elem)) {
// For now base decision on whether to delay flush on
@@ -1353,10 +1362,10 @@
boolean delayable = !openCursors.isEmpty();
qe.save(source, delayable);
}
-
+
// Check with cursors to see if any of them have room for it
// in memory:
- if(persistencePolicy.isPagingEnabled()) {
+ if (persistencePolicy.isPagingEnabled()) {
// Otherwise check with any other open cursor to see if
// it can hang on to the element:
@@ -1393,8 +1402,6 @@
qe.unload(source);
}
}
-
-
}
@@ -1444,7 +1451,7 @@
public void releaseBlock(Cursor<V> cursor, long block) {
HashSet<Cursor<V>> cursors = reservedBlocks.get(block);
if (cursors == null) {
- if (true || DEBUG)
+ if (DEBUG)
System.out.println(this + " removeBlockInterest " + block + ", no cursors" + cursor);
} else {
if (cursors.remove(cursor)) {
@@ -1560,4 +1567,17 @@
return "QueueLoader " + CursoredQueue.this;
}
}
+
+ /**
+ * @param sync
+ */
+ public void shutdown(boolean sync) {
+ stop();
+ if (!openCursors.isEmpty()) {
+ ArrayList<Cursor<V>> cursors = new ArrayList<Cursor<V>>(openCursors.size());
+ for (Cursor<V> cursor : cursors) {
+ cursor.close();
+ }
+ }
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Fri May 29 15:20:12 2009
@@ -35,7 +35,6 @@
private CursoredQueue<E> queue;
private final FlowController<E> controller;
private final IFlowSizeLimiter<E> limiter;
- private boolean started = true;
private Cursor<E> cursor;
private final QueueDescriptor queueDescriptor;
private PersistencePolicy<E> persistencePolicy;
@@ -123,7 +122,7 @@
synchronized (ExclusivePersistentQueue.this) {
E elem = qe.getElement();
if (qe.delete()) {
- if (!qe.acquired) {
+ if (!qe.isAcquired()) {
controller.elementDispatched(elem);
}
}
@@ -147,8 +146,9 @@
FlowController<QueueElement<E>> memoryController = null;
if (persistencePolicy.isPagingEnabled()) {
IFlowSizeLimiter<QueueElement<E>> limiter = new SizeLimiter<QueueElement<E>>(persistencePolicy.getPagingInMemorySize(), persistencePolicy.getPagingInMemorySize() / 2) {
+ @Override
public int getElementSize(QueueElement<E> qe) {
- return qe.size;
+ return qe.getLimiterSize();
};
};
@@ -232,21 +232,25 @@
throw new IllegalStateException("Not initialized");
}
if (!started) {
- started = true;
- if (isDispatchReady()) {
- notifyReady();
- }
+ super.start();
queue.start();
}
}
public synchronized void stop() {
if (started) {
- started = false;
+ super.stop();
queue.stop();
}
}
+ public void shutdown(boolean sync) {
+ super.shutdown(sync);
+ synchronized (this) {
+ queue.shutdown(sync);
+ }
+ }
+
public FlowController<E> getFlowController(Flow flow) {
return controller;
}
@@ -272,13 +276,15 @@
SubscriptionDeliveryCallback callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
// See if the sink has room:
+ qe.setAcquired(subscription);
if (subscription.offer(qe.elem, sourceController, callback)) {
- qe.setAcquired(true);
controller.elementDispatched(qe.getElement());
// If remove on dispatch acknowledge now:
if (callback == null) {
qe.acknowledge();
}
+ } else {
+ qe.setAcquired(null);
}
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Fri May 29 15:20:12 2009
@@ -27,7 +27,6 @@
public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
private final LinkedList<E> queue = new LinkedList<E>();
private final FlowController<E> controller;
- private boolean started = true;
/**
* Creates a flow queue that can handle multiple flows.
@@ -57,14 +56,6 @@
}
}
- public synchronized void start() {
- started = true;
- }
-
- public synchronized void stop() {
- started = false;
- }
-
public FlowController<E> getFlowController(Flow flow) {
return controller;
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Fri May 29 15:20:12 2009
@@ -21,7 +21,7 @@
import org.apache.activemq.queue.QueueStore.PersistentQueue;
import org.apache.activemq.util.Mapper;
-public interface IQueue<K, V> extends IFlowSink<V>, PersistentQueue<K,V>{
+public interface IQueue<K, V> extends IFlowSink<V>, PersistentQueue<K, V> {
/**
* @return the number of elements currently held by the queue.
@@ -51,14 +51,14 @@
public boolean removeSubscription(Subscription<V> sub);
/**
- * Sets a mapper returning the expiration time for elements in this
- * queue. A positive value indicates that the message has an expiration
- * time.
+ * Sets a mapper returning the expiration time for elements in this queue. A
+ * positive value indicates that the message has an expiration time.
*
- * @param expirationMapper The expiration mapper.
+ * @param expirationMapper
+ * The expiration mapper.
*/
public void setExpirationMapper(Mapper<Long, V> expirationMapper);
-
+
/**
* Sets the dispatcher for the queue.
*
@@ -68,6 +68,18 @@
public void setDispatcher(IDispatcher dispatcher);
/**
+ * Sets the base dispatch priority for the queue. Setting to higher value
+ * will increase the preference with which the dispatcher dispatches the
+ * queue. If the queue itself is priority based, the queue may further
+ * increase it's dispatch priority based on the priority of elements that it
+ * holds.
+ *
+ * @param priority
+ * The base priority for the queue
+ */
+ public void setDispatchPriority(int priority);
+
+ /**
* Starts the queue.
*/
public void start();
@@ -81,4 +93,17 @@
*/
public void stop();
+ /**
+ * The queue is stopped via {@link #stop()} then shutdown. Once shutdown an
+ * {@link IQueue} cannot be restarted. Attempts to manipulate the queue once
+ * the queue is shutdown will thrown an {@link IllegalStateException} unless
+ * otherwise documented.
+ *
+ * @param sync
+ * If true will cause the calling thread to block until all
+ * resources held by the queue are cleaned up. Otherwise, the
+ * queue shutdown will proceed asynchronously.
+ */
+ public void shutdown(boolean sync);
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri May 29 15:20:12 2009
@@ -21,6 +21,7 @@
import java.util.HashSet;
import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
@@ -35,7 +36,9 @@
private QueueStore<K, V> store;
protected IDispatcher dispatcher;
private boolean started;
+ private boolean shutdown = false;
protected QueueStore.QueueDescriptor queueDescriptor;
+ private int basePriority = 0;
public PartitionedQueue(String name) {
super(name);
@@ -51,6 +54,7 @@
public IQueue<K, V> getPartition(int partitionKey) {
boolean save = false;
IQueue<K, V> rc = null;
+ checkShutdown();
synchronized (partitions) {
rc = partitions.get(partitionKey);
if (rc == null) {
@@ -68,19 +72,38 @@
return rc;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.queue.IQueue#setDispatchPriority(int)
+ */
+ public void setDispatchPriority(int priority) {
+ synchronized (this) {
+ if (basePriority != priority) {
+ basePriority = priority;
+ if (!shutdown) {
+ for (IQueue<K, V> queue : partitions.values()) {
+ queue.setDispatchPriority(basePriority);
+ }
+ }
+ }
+ }
+ }
+
public int getEnqueuedCount() {
+ checkShutdown();
synchronized (partitions) {
+
int count = 0;
for (IQueue<K, V> queue : partitions.values()) {
- if (queue != null) {
- count += queue.getEnqueuedCount();
- }
+ count += queue.getEnqueuedCount();
}
return count;
}
}
public synchronized long getEnqueuedSize() {
+ checkShutdown();
synchronized (partitions) {
long size = 0;
for (IQueue<K, V> queue : partitions.values()) {
@@ -107,10 +130,12 @@
abstract public IQueue<K, V> createPartition(int partitionKey);
public void addPartition(int partitionKey, IQueue<K, V> queue) {
+ checkShutdown();
synchronized (partitions) {
partitions.put(partitionKey, queue);
for (Subscription<V> sub : subscriptions) {
queue.addSubscription(sub);
+ queue.setDispatchPriority(basePriority);
}
}
}
@@ -124,6 +149,7 @@
public synchronized void start() {
if (!started) {
+ checkShutdown();
started = true;
for (IQueue<K, V> partition : partitions.values()) {
if (partition != null)
@@ -142,7 +168,27 @@
}
}
+ public void shutdown(boolean sync) {
+ HashMap<Integer, IQueue<K, V>> partitions = null;
+ synchronized (this) {
+ if (!shutdown) {
+ shutdown = true;
+ started = false;
+ }
+ partitions = this.partitions;
+ this.partitions = null;
+ }
+
+ if (partitions != null) {
+ for (IQueue<K, V> partition : partitions.values()) {
+ if (partition != null)
+ partition.shutdown(sync);
+ }
+ }
+ }
+
public void addSubscription(Subscription<V> sub) {
+ checkShutdown();
synchronized (partitions) {
subscriptions.add(sub);
Collection<IQueue<K, V>> values = partitions.values();
@@ -153,6 +199,7 @@
}
public boolean removeSubscription(Subscription<V> sub) {
+ checkShutdown();
synchronized (partitions) {
if (subscriptions.remove(sub)) {
Collection<IQueue<K, V>> values = partitions.values();
@@ -186,6 +233,7 @@
}
public void setDispatcher(IDispatcher dispatcher) {
+ checkShutdown();
this.dispatcher = dispatcher;
synchronized (partitions) {
Collection<IQueue<K, V>> values = partitions.values();
@@ -194,4 +242,10 @@
}
}
}
+
+ private void checkShutdown() {
+ if (shutdown) {
+ throw new IllegalStateException(this + " is shutdown");
+ }
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Fri May 29 15:20:12 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.queue;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import org.apache.activemq.dispatch.IDispatcher;
@@ -41,6 +42,8 @@
private boolean started;
private QueueStore.QueueDescriptor queueDescriptor;
private Mapper<Long, V> expirationMapper;
+ private int basePriority = 0;
+ private boolean shutdown = false;
public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
super(name);
@@ -56,6 +59,7 @@
public synchronized void start() {
if (!started) {
+ checkShutdown();
started = true;
for (SharedQueue<K, V> partition : partitions) {
if (partition != null)
@@ -74,7 +78,26 @@
}
}
+ public void shutdown(boolean sync) {
+ ArrayList<SharedQueue<K, V>> partitions = null;
+ synchronized (this) {
+ if (!shutdown) {
+ shutdown = true;
+ started = false;
+ }
+ partitions = this.partitions;
+ }
+
+ if (partitions != null) {
+ for (IQueue<K, V> partition : partitions) {
+ if (partition != null)
+ partition.shutdown(sync);
+ }
+ }
+ }
+
public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+ checkShutdown();
// No-op, only partitions should have stored values.
if (count > 0 || size > 0) {
throw new IllegalArgumentException("Partioned queues do not themselves hold values");
@@ -94,6 +117,7 @@
}
public synchronized int getEnqueuedCount() {
+ checkShutdown();
int count = 0;
for (SharedQueue<K, V> queue : partitions) {
if (queue != null) {
@@ -125,6 +149,7 @@
public void addSubscription(Subscription<V> sub) {
synchronized (this) {
+ checkShutdown();
subscriptions.add(sub);
for (SharedQueue<K, V> queue : partitions) {
if (queue != null) {
@@ -148,18 +173,41 @@
return false;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.queue.IQueue#setDispatchPriority(int)
+ */
+ public void setDispatchPriority(int priority) {
+ synchronized (this) {
+ if (basePriority != priority) {
+ basePriority = priority;
+ if (shutdown) {
+ return;
+ }
+ for (int i = 0; i < limiter.getPriorities(); i++) {
+ SharedQueue<K, V> queue = partitions.get(i);
+ if (queue != null) {
+ queue.setDispatchPriority(basePriority + i);
+ }
+ }
+ }
+ }
+ }
+
public IQueue<K, V> createPartition(int prio) {
return getPartition(prio, false);
}
private IQueue<K, V> getPartition(int prio, boolean initialize) {
synchronized (this) {
+ checkShutdown();
SharedQueue<K, V> queue = partitions.get(prio);
if (queue == null) {
queue = new SharedQueue<K, V>(getResourceName() + "$" + prio, limiter.getPriorityLimter(prio), this);
queue.setAutoRelease(autoRelease);
queue.setDispatcher(dispatcher);
- queue.setDispatchPriority(prio);
+ queue.setDispatchPriority(basePriority + prio);
queue.setKeyMapper(keyMapper);
queue.setStore(store);
queue.setPersistencePolicy(persistencePolicy);
@@ -172,8 +220,7 @@
queue.initialize(0, 0, 0, 0);
onFlowOpened(queue.getFlowControler());
}
-
-
+
if (started) {
queue.start();
}
@@ -213,4 +260,11 @@
this.dispatcher = dispatcher;
super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
}
+
+ private void checkShutdown() {
+ if (shutdown) {
+ throw new IllegalStateException(this + " is shutdown");
+ }
+ }
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Fri May 29 15:20:12 2009
@@ -56,7 +56,6 @@
// Limiter/Controller for the size of the queue:
private FlowController<V> inputController;
private final IFlowSizeLimiter<V> sizeLimiter;
- private final boolean RELEASE_ON_ACQUISITION = true;
private final QueueStore.QueueDescriptor queueDescriptor;
@@ -85,7 +84,6 @@
private final LinkedNodeList<SubscriptionContext> trailingConsumers = new LinkedNodeList<SubscriptionContext>();
private boolean initialized = false;
- private boolean started = false;
private Mapper<Long, V> expirationMapper;
@@ -149,7 +147,6 @@
@Override
protected int getElementSize(V elem) {
- // TODO Auto-generated method stub
return sizeLimiter.getElementSize(elem);
}
@@ -215,8 +212,9 @@
FlowController<QueueElement<V>> controller = null;
if (pageInElements && persistencePolicy.isPagingEnabled() && sizeLimiter.getCapacity() > persistencePolicy.getPagingInMemorySize()) {
IFlowSizeLimiter<QueueElement<V>> limiter = new SizeLimiter<QueueElement<V>>(persistencePolicy.getPagingInMemorySize(), persistencePolicy.getPagingInMemorySize() / 2) {
+ @Override
public int getElementSize(QueueElement<V> qe) {
- return qe.size;
+ return qe.getLimiterSize();
};
};
@@ -264,11 +262,10 @@
final void acknowledge(QueueElement<V> qe) {
synchronized (mutex) {
- V elem = qe.getElement();
- if (qe.delete()) {
- if (!qe.acquired || !RELEASE_ON_ACQUISITION) {
- inputController.elementDispatched(elem);
- }
+ qe.delete();
+ //If the element wasn't acqired release space:
+ if (!qe.isAcquired()) {
+ sizeLimiter.remove(1, qe.getLimiterSize());
}
}
}
@@ -307,10 +304,10 @@
}
}
- public void shutdown() {
+ public void shutdown(boolean sync) {
+ super.shutdown(sync);
synchronized (mutex) {
- stop();
- sharedCursor.deactivate();
+ queue.shutdown(sync);
}
}
@@ -333,10 +330,7 @@
}
public void flowElemAccepted(ISourceController<V> source, V elem) {
- synchronized (mutex) {
- // TODO should change flow controller to pass original source:
- accepted(null, elem);
- }
+ throw new UnsupportedOperationException("Flow Controller pass-through not supported");
}
private final void accepted(ISourceController<?> source, V elem) {
@@ -394,7 +388,7 @@
// Process shared consumers:
if (!sharedConsumers.isEmpty()) {
QueueElement<V> next = sharedCursor.getNext();
-
+
if (next != null) {
// See if there are any interested consumers:
@@ -639,28 +633,32 @@
return ACCEPTED;
}
- // If the sub doesn't remove on dispatch set an ack listener:
+ // If the sub doesn't remove on dispatch pass it the callback
SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
// See if the sink has room:
+ qe.setAcquired(sub);
if (sub.offer(qe.elem, this, callback)) {
if (!sub.isBrowser()) {
- qe.setAcquired(true);
- if (RELEASE_ON_ACQUISITION) {
- inputController.elementDispatched(qe.getElement());
- }
+
+ sizeLimiter.remove(1, qe.getLimiterSize());
// If remove on dispatch acknowledge now:
if (callback == null) {
qe.acknowledge();
}
}
+ else
+ {
+ qe.setAcquired(null);
+ }
// Advance our cursor:
cursor.skip(qe);
return ACCEPTED;
} else {
+ qe.setAcquired(null);
// Remove from dispatch list until we are resumed:
if (DEBUG) {
System.out.println(this + " Declined: " + qe);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Fri May 29 15:20:12 2009
@@ -58,8 +58,6 @@
protected Mapper<K, V> keyMapper;
private long directs;
- private boolean started = false;
-
private final ISourceController<V> sourceControler = new ISourceController<V>() {
public Flow getFlow() {
@@ -146,19 +144,6 @@
}
}
- public synchronized void start() {
- if (!started) {
- started = true;
- if (isDispatchReady()) {
- super.notifyReady();
- }
- }
- }
-
- public synchronized void stop() {
- started = false;
- }
-
public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
// this queue is not persistent, so we can ignore this.
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Fri May 29 15:20:12 2009
@@ -68,6 +68,7 @@
private static final boolean USE_KAHA_DB = true;
private static final boolean PERSISTENT = false;
private static final boolean PURGE_STORE = true;
+ private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -77,7 +78,11 @@
protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
protected IDispatcher createDispatcher() {
- return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+ if (THREAD_POOL_SIZE > 1) {
+ return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, THREAD_POOL_SIZE);
+ } else {
+ return PriorityDispatcher.createPriorityDispatcher("TestDispatcher", MessageBroker.MAX_PRIORITY);
+ }
}
protected int consumerStartDelay = 0;
@@ -123,7 +128,7 @@
consumerStartDelay = 0;
}
- public void testSharedQueue_1_1_1() throws Exception {
+ public void test1_1_1() throws Exception {
startServices();
try {
createQueues(1);
@@ -136,7 +141,7 @@
}
}
- public void testSharedQueue_10_10_10() throws Exception {
+ public void test10_10_10() throws Exception {
startServices();
try {
createQueues(10);
@@ -149,7 +154,7 @@
}
}
- public void testSharedQueue_10_1_10() throws Exception {
+ public void test10_1_10() throws Exception {
startServices();
try {
createQueues(1);
@@ -162,7 +167,7 @@
}
}
- public void testSharedQueue_10_1_1() throws Exception {
+ public void test10_1_1() throws Exception {
startServices();
try {
createQueues(10);
@@ -175,7 +180,7 @@
}
}
- public void testSharedQueue_1_1_10() throws Exception {
+ public void test1_1_10() throws Exception {
startServices();
try {
createQueues(10);
@@ -237,6 +242,7 @@
private final void createQueues(int count) {
for (int i = 0; i < count; i++) {
IQueue<Long, MessageDelivery> queue = queueStore.createSharedQueue("queue-" + (i + 1));
+ queue.setDispatchPriority(1);
queues.add(queue);
}
}
@@ -403,6 +409,7 @@
this.name = name;
Flow flow = new Flow(name + "-outbound", false);
limiter = new SizeLimiter<MessageDelivery>(1024 * 1024, 512 * 1024) {
+ @Override
public int getElementSize(MessageDelivery m) {
return m.getFlowLimiterSize();
}
@@ -456,6 +463,7 @@
public boolean offer(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
if (controller.offer(element, source)) {
addInternal(element, source, callback);
+ return true;
}
return false;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Fri May 29 15:20:12 2009
@@ -45,6 +45,7 @@
final Flow flow = new Flow("client-"+name+"-inbound", false);
inputResumeThreshold = inputWindowSize/2;
WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, inputResumeThreshold) {
+ @Override
protected void sendCredit(int credit) {
MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);
write(ack);
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Fri May 29 15:20:12 2009
@@ -415,8 +415,8 @@
}
@Override
- public void remove(long size) {
- super.remove(size);
+ public void remove(int count, long size) {
+ super.remove(count, size);
if (!clientMode) {
available += size;
if (available >= capacity - resumeThreshold) {
@@ -430,7 +430,7 @@
public void onProtocolMessage(FlowControl m) {
synchronized (outputQueue) {
- remove(m.getCredit());
+ remove(1, m.getCredit());
}
}