You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/29 17:20:13 UTC

svn commit: r780014 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/activemq/...

Author: chirino
Date: Fri May 29 15:20:12 2009
New Revision: 780014

URL: http://svn.apache.org/viewvc?rev=780014&view=rev
Log:
appling patch at https://issues.apache.org/activemq/browse/AMQ-2271
Thanks colin.

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri May 29 15:20:12 2009
@@ -35,8 +35,8 @@
         }*/
         
         @Override
-        public void remove(long size) {
-            super.remove(size);
+        public void remove(int count, long size) {
+            super.remove(count, size);
             if (!clientMode) {
                 available += size;
                 if (available >= capacity - resumeThreshold) {
@@ -51,7 +51,7 @@
         }
 
         public void onProtocolCredit(int credit) {
-            remove(credit);
+            remove(1, credit);
         }
 
         public int getElementSize(E m) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Fri May 29 15:20:12 2009
@@ -71,7 +71,7 @@
 
     private Mapper<Integer, MessageDelivery> partitionMapper;
 
-    private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
+    private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
     private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
     // Be default we don't page out elements to disk.
     private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
@@ -115,7 +115,7 @@
     private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
     private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
     // Be default we don't page out elements to disk.
-    private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+    private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
 
     private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
 
@@ -198,7 +198,7 @@
             ExclusivePersistentQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
             durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
             LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-            
+
         }
     }
 
@@ -253,10 +253,12 @@
     }
 
     public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
-        Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
-        ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
-        ret.addAll(c);
-        return ret;
+        synchronized (this) {
+            Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
+            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+            ret.addAll(c);
+            return ret;
+        }
     }
 
     public ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueue(String name) {
@@ -275,6 +277,15 @@
         return queue;
     }
 
+    public Collection<ExclusivePersistentQueue<Long, MessageDelivery>> getDurableQueues() {
+        synchronized (this) {
+            Collection<ExclusivePersistentQueue<Long, MessageDelivery>> c = durableQueues.values();
+            ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>> ret = new ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>>(c.size());
+            ret.addAll(c);
+            return ret;
+        }
+    }
+
     public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
 
         IQueue<Long, MessageDelivery> queue = null;
@@ -294,7 +305,7 @@
 
     private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
         ExclusivePersistentQueue<Long, MessageDelivery> queue;
-        
+
         SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
             @Override
             public int getElementSize(MessageDelivery elem) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Fri May 29 15:20:12 2009
@@ -32,15 +32,18 @@
     Queue(IQueue<Long, MessageDelivery> queue) {
         this.queue = queue;
     }
-    
 
-    /* (non-Javadoc)
-     * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
      */
     public void deliver(MessageDelivery message, ISourceController<?> source) {
         queue.add(message, source);
     }
-    
+
     public final void addSubscription(final Subscription<MessageDelivery> sub) {
         queue.addSubscription(sub);
     }
@@ -59,6 +62,12 @@
         }
     }
 
+    public void shutdown(boolean sync) throws Exception {
+        if (queue != null) {
+            queue.shutdown(sync);
+        }
+    }
+
     public boolean hasSelector() {
         return false;
     }
@@ -95,16 +104,24 @@
             this.queue = queue;
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+         * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
          */
         public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
             this.subscription = subscription;
             queue.addSubscription(subscription);
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+         * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
          */
         public void disconnect(Subscription<MessageDelivery> context) {
             queue.removeSubscription(subscription);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Fri May 29 15:20:12 2009
@@ -22,6 +22,7 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.AbstractFlowQueue;
 import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.IQueue;
 
@@ -95,8 +96,13 @@
             return;
         }
         for (Queue queue : queues.values()) {
-            queue.stop();
+            queue.shutdown(true);
         }
+        
+        for (AbstractFlowQueue<MessageDelivery> queue : queueStore.getDurableQueues()) {
+            queue.shutdown(true);
+        }
+        
         started = false;
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri May 29 15:20:12 2009
@@ -427,6 +427,7 @@
 
                 selector = parseSelector(info);
             limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+                @Override
                 public int getElementSize(MessageDelivery m) {
                     return m.getFlowLimiterSize();
                 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Fri May 29 15:20:12 2009
@@ -304,6 +304,7 @@
             if (ackMode != StompSubscription.AUTO_ACK) {
                 Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
                 limiter = new WindowLimiter<MessageDelivery>(true, flow, connection.getOutputWindowSize(), connection.getOutputResumeThreshold()) {
+                    @Override
                     public int getElementSize(MessageDelivery m) {
                         return m.getFlowLimiterSize();
                     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Fri May 29 15:20:12 2009
@@ -371,7 +371,7 @@
                         }
 
                         synchronized (opQueue) {
-                            this.storeLimiter.remove(release);
+                            this.storeLimiter.remove(1, release);
                         }
                     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri May 29 15:20:12 2009
@@ -185,7 +185,7 @@
         Thread joinThread = null;
         synchronized (this) {
             if (thread != null) {
-                dispatch(new RunnableAdapter() {
+                dispatchInternal(new RunnableAdapter() {
                     public void run() {
                         running = false;
                     }
@@ -215,8 +215,10 @@
 
     public void run() {
 
-        // Inform the dispatcher that we have started:
-        pooledDispatcher.onDispatcherStarted((D) this);
+        if (pooledDispatcher != null) {
+            // Inform the dispatcher that we have started:
+            pooledDispatcher.onDispatcherStarted((D) this);
+        }
 
         PriorityDispatchContext pdc;
         try {
@@ -288,7 +290,9 @@
         } catch (Throwable thrown) {
             thrown.printStackTrace();
         } finally {
-            pooledDispatcher.onDispatcherStopped((D) this);
+            if (pooledDispatcher != null) {
+                pooledDispatcher.onDispatcherStopped((D) this);
+            }
             cleanup();
         }
     }
@@ -368,6 +372,14 @@
         return true;
     }
 
+    //Special dispatch method that allow high priority dispatch:
+    private final void dispatchInternal(Dispatchable dispatchable, int priority)
+    {
+        PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+        context.priority = priority;
+        context.requestDispatch();
+    }
+    
     /*
      * (non-Javadoc)
      * 
@@ -424,7 +436,14 @@
     }
 
     private final D getCurrentDispatcher() {
-        return pooledDispatcher.getCurrentDispatcher();
+        if (pooledDispatcher != null) {
+            return pooledDispatcher.getCurrentDispatcher();
+        } else if (Thread.currentThread() == thread) {
+            return (D) this;
+        } else {
+            return null;
+        }
+
     }
 
     private final PooledDispatchContext<D> getCurrentDispatchContext() {
@@ -460,7 +479,7 @@
             this.dispatchable = dispatchable;
             this.name = name;
             this.currentOwner = (D) PriorityDispatcher.this;
-            if (persistent) {
+            if (persistent && pooledDispatcher != null) {
                 this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
             } else {
                 this.tracker = null;
@@ -549,7 +568,9 @@
             if (closed) {
                 return;
             }
-
+            
+            priority = Math.min(priority, MAX_USER_PRIORITY);
+            
             if (this.priority == priority) {
                 return;
             }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Fri May 29 15:20:12 2009
@@ -16,52 +16,107 @@
  */
 package org.apache.activemq.dispatch;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 
 public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
 
-    private final boolean DEBUG = false;
+    private final boolean DEBUG = true;
+
+    //TODO: Added plumbing for periodic rebalancing which we should
+    //consider implementing
+    private static final boolean ENABLE_UPDATES = false;
+    private final ArrayList<D> dispatchers = new ArrayList<D>();
+
+    private AtomicBoolean running = new AtomicBoolean(false);
+    private boolean needsUpdate = false;
+    private static final int FREQUENCY = 5000;
+    private boolean scheduled = false;
+    private final Runnable timerCallback;
 
     public SimpleLoadBalancer() {
-    }
 
-    @SuppressWarnings("hiding")
-    private class ExecutionStats<D extends IDispatcher> {
-        final PooledDispatchContext<D> target;
-        final PooledDispatchContext<D> source;
-        int count;
+        timerCallback = new Runnable() {
+            public final void run() {
+                if (running.get()) {
+                    rebalance();
+                    synchronized (dispatchers) {
+                        scheduled = false;
+                        scheduleNext();
+                    }
+                }
+            }
+        };
 
-        ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
-            this.target = target;
-            this.source = source;
-        }
+    }
 
-        public String toString() {
-            return "Connection from: " + source + " to " + target;
+    private void rebalance() {
+        if (!needsUpdate) {
+            return;
         }
+        // TODO Auto-generated method stub
     }
 
-    public void onDispatcherStarted(D dispatcher) {
-
+    public void start() {
+        if (running.compareAndSet(false, true)) {
+            scheduleNext();
+        }
     }
 
-    public void onDispatcherStopped(D dispatcher) {
+    private void scheduleNext() {
+        if (!ENABLE_UPDATES) {
+            return;
+        }
+        synchronized (dispatchers) {
+            if (!scheduled) {
+                if (!dispatchers.isEmpty()) {
+                    dispatchers.get(0).schedule(timerCallback, FREQUENCY, TimeUnit.MILLISECONDS);
+                    scheduled = true;
+                }
+            }
+        }
+    }
 
+    public void stop() {
+        running.compareAndSet(true, false);
     }
 
-    public void start() {
+    public synchronized final void onDispatcherStarted(D dispatcher) {
+        dispatchers.add(dispatcher);
+        scheduleNext();
     }
 
-    public void stop() {
+    /**
+     * A Dispatcher must call this when exiting it's dispatch loop
+     */
+    public void onDispatcherStopped(D dispatcher) {
+        dispatchers.remove(dispatcher);
     }
-    
+
     public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
         return new SimpleExecutionTracker(context);
     }
 
+    private static class ExecutionStats<D extends IDispatcher> {
+        final PooledDispatchContext<D> target;
+        final PooledDispatchContext<D> source;
+        int count;
+
+        ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
+            this.target = target;
+            this.source = source;
+        }
+
+        public String toString() {
+            return "Connection from: " + source + " to " + target;
+        }
+    }
+
     private class SimpleExecutionTracker implements ExecutionTracker<D> {
         private final HashMap<PooledDispatchContext<D>, ExecutionStats<D>> sources = new HashMap<PooledDispatchContext<D>, ExecutionStats<D>>();
         private final PooledDispatchContext<D> context;
@@ -102,6 +157,7 @@
                     if (singleSource == null && sources.isEmpty()) {
                         singleSource = callingContext;
                         ExecutionStats<D> stats = new ExecutionStats<D>(callingContext, context);
+                        stats.count++;
                         sources.put(callingContext, stats);
 
                         // If this context only has a single source
@@ -121,6 +177,7 @@
                             stats = new ExecutionStats<D>(callingContext, context);
                             sources.put(callingContext, stats);
                         }
+                        stats.count++;
 
                         if (singleSource != null) {
                             singleSource = null;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSizeLimiter.java Fri May 29 15:20:12 2009
@@ -24,6 +24,8 @@
 
     public boolean add(int count, long size);
     
+    public void remove(int count, long size);
+    
     public long getCapacity();
     
     public long getSize();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java Fri May 29 15:20:12 2009
@@ -40,7 +40,7 @@
 
     private class Priority extends AbstractLimiter<E> implements IFlowSizeLimiter<E> {
         final int priority;
-        int size;
+        long size;
         int reserved;
         private boolean throttled;
 
@@ -99,16 +99,16 @@
             if (reserved > 0) {
                 int res = reserved;
                 reserved = 0;
-                remove(res);
+                remove(1, res);
             }
         }
 
         public void remove(E elem) {
             int size = sizeMapper.map(elem);
-            remove(size);
+            remove(1, size);
         }
 
-        protected void remove(int s) {
+        public void remove(int c, long s) {
             size -= s;
             totalSize -= s;
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java Fri May 29 15:20:12 2009
@@ -44,7 +44,7 @@
     }
 
     public final void remove(E elem) {
-        remove(getElementSize(elem));
+        remove(1, getElementSize(elem));
     }
 
     public void reserve(E elem) {
@@ -55,11 +55,11 @@
         if (reserved > 0) {
             long res = reserved;
             reserved = 0;
-            remove(res);
+            remove(1, res);
         }
     }
 
-    public void remove(long s) {
+    public void remove(int count, long s) {
         this.size -= s;
         if (size < 0) {
             Exception ie = new IllegalStateException("Size Negative!" + size);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Fri May 29 15:20:12 2009
@@ -45,6 +45,7 @@
             thrown.printStackTrace();
         }
     };
+    protected boolean started;
 
     AbstractFlowQueue() {
         super();
@@ -80,6 +81,37 @@
         return this;
     }
 
+    public synchronized void start() {
+        if (!started) {
+            started = true;
+            if (isDispatchReady()) {
+                notifyReady();
+            }
+        }
+    }
+
+    public synchronized void stop() {
+        started = false;
+    }
+    
+    /**
+     * Calls stop and cleans up resources associated with the queue.
+     * @param sync
+     */
+    public void shutdown(boolean sync) {
+        stop();
+        DispatchContext dc = null;
+        synchronized (this) {
+            dc = dispatchContext;
+            dispatchContext = null;
+            
+        }
+        
+        if (dc != null) {
+            dc.close(sync);
+        }
+    }
+
     /**
      * Sets an asynchronous dispatcher for this source. As elements become
      * available they will be dispatched to the worker pool.
@@ -143,7 +175,7 @@
             if (readyListeners == null) {
                 return;
             }
-            
+
             if (!readyListeners.isEmpty()) {
                 for (FlowReadyListener<E> listener : readyListeners) {
                     listener.onFlowReady(this);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java Fri May 29 15:20:12 2009
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.queue;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -61,9 +61,9 @@
     private final Expirator expirator;
     private final QueueStore<?, V> queueStore;
     private final ElementLoader loader;
-    public final QueueDescriptor queueDescriptor;
+    private final QueueDescriptor queueDescriptor;
     private final Object mutex;
-    
+
     public CursoredQueue(PersistencePolicy<V> persistencePolicy, Mapper<Long, V> expirationMapper, Flow flow, QueueDescriptor queueDescriptor, QueueStore<?, V> store, Object mutex) {
         this.persistencePolicy = persistencePolicy;
         this.mutex = mutex;
@@ -197,7 +197,8 @@
     }
 
     /**
-     * @return The first sequence number in the queue (or the next sequence number if it is empty)
+     * @return The first sequence number in the queue (or the next sequence
+     *         number if it is empty)
      */
     public long getFirstSequence() {
         if (queue.isEmpty()) {
@@ -278,8 +279,7 @@
 
         private boolean paused;
 
-        public Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements,
-                IFlowController<QueueElement<V>> memoryController) {
+        public Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements, IFlowController<QueueElement<V>> memoryController) {
             this.name = name;
             this.queue = queue.queue;
             this.loader = queue.loader;
@@ -336,13 +336,11 @@
             return !paused && activated && pageInElements;
         }
 
-        public void close()
-        {
+        public void close() {
             deactivate();
             cQueue.openCursors.remove(this);
         }
-        
-        
+
         public final void reset(long sequence) {
             updateSequence(sequence);
             updateCurrent(null);
@@ -695,8 +693,9 @@
         }
 
         /**
-         * @param l Set the highest sequence number to which this 
-         * cursor can advance.
+         * @param l
+         *            Set the highest sequence number to which this cursor can
+         *            advance.
          */
         public void setLimit(long l) {
             limit = l;
@@ -710,7 +709,7 @@
         final CursoredQueue<V> queue;
 
         V elem;
-        int size = -1;
+        private int size = -1;
         long expiration = -1;
         boolean redelivered = false;
 
@@ -730,8 +729,8 @@
         // Indicates whether the element has been saved in the store.
         boolean saved = false;
 
-        boolean deleted = false;
-        boolean acquired = false;
+        private boolean deleted = false;
+        private Subscription<V> owner = null;
 
         public QueueElement(V elem, long sequence, CursoredQueue<V> queue) {
             this.elem = elem;
@@ -764,6 +763,10 @@
             return sequence;
         }
 
+        public final int getLimiterSize() {
+            return size;
+        }
+
         public final void addHardRef() {
             hardRefs++;
             // Page in the element (providing it wasn't removed):
@@ -799,8 +802,8 @@
             assert softRefs >= 0;
         }
 
-        public final void setAcquired(boolean val) {
-            this.acquired = val;
+        public final void setAcquired(Subscription<V> owner) {
+            this.owner = owner;
         }
 
         public final void acknowledge() {
@@ -825,7 +828,7 @@
         }
 
         public final void unacquire(ISourceController<?> source) {
-            acquired = false;
+            owner = null;
             if (isExpired()) {
                 acknowledge();
             } else {
@@ -843,7 +846,7 @@
             // Don't page out of there is a hard ref to the element
             // or if it is acquired (since we need the element
             // during delete:
-            if (!deleted && (hardRefs > 0 || acquired)) {
+            if (!deleted && (hardRefs > 0 || isAcquired())) {
                 return;
             }
 
@@ -854,7 +857,7 @@
                     if (!queue.getPersistencePolicy().isPersistent(elem)) {
                         save(controller, true);
                         if (DEBUG)
-                            System.out.println("Paged out element: " + this);
+                            System.out.println("Paging out non-pers element: " + this);
                     }
 
                     // If save is pending don't unload until the save has
@@ -864,6 +867,8 @@
                     }
                 }
 
+                if (DEBUG)
+                    System.out.println("Paged out element: " + this);
                 elem = null;
             }
 
@@ -886,7 +891,7 @@
                 // Otherwise as long as the element isn't acquired we can unload
                 // it. If it is acquired we keep the soft ref arount to remember
                 // that it is.
-                else if (!acquired && queue.getLoader().isPageOutPlaceHolders()) {
+                else if (!isAcquired() && queue.getLoader().isPageOutPlaceHolders()) {
 
                     loaded = false;
 
@@ -1008,7 +1013,11 @@
         }
 
         public final boolean isAcquired() {
-            return acquired || deleted;
+            return owner != null;
+        }
+
+        public Subscription<V> getOwner() {
+            return owner;
         }
 
         public final long getExpiration() {
@@ -1099,7 +1108,7 @@
         }
 
         public String toString() {
-            return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " aquired: " + acquired;
+            return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
         }
 
     }
@@ -1344,7 +1353,7 @@
             if (!persistencePolicy.isPagingEnabled()) {
                 qe.addHardRef();
             }
-            
+
             // Persist the element if required:
             if (persistencePolicy.isPersistent(qe.elem)) {
                 // For now base decision on whether to delay flush on
@@ -1353,10 +1362,10 @@
                 boolean delayable = !openCursors.isEmpty();
                 qe.save(source, delayable);
             }
-            
+
             // Check with cursors to see if any of them have room for it
             // in memory:
-            if(persistencePolicy.isPagingEnabled()) {
+            if (persistencePolicy.isPagingEnabled()) {
 
                 // Otherwise check with any other open cursor to see if
                 // it can hang on to the element:
@@ -1393,8 +1402,6 @@
                     qe.unload(source);
                 }
             }
-            
-            
 
         }
 
@@ -1444,7 +1451,7 @@
         public void releaseBlock(Cursor<V> cursor, long block) {
             HashSet<Cursor<V>> cursors = reservedBlocks.get(block);
             if (cursors == null) {
-                if (true || DEBUG)
+                if (DEBUG)
                     System.out.println(this + " removeBlockInterest " + block + ", no cursors" + cursor);
             } else {
                 if (cursors.remove(cursor)) {
@@ -1560,4 +1567,17 @@
             return "QueueLoader " + CursoredQueue.this;
         }
     }
+
+    /**
+     * @param sync
+     */
+    public void shutdown(boolean sync) {
+        stop();
+        if (!openCursors.isEmpty()) {
+            ArrayList<Cursor<V>> cursors = new ArrayList<Cursor<V>>(openCursors.size());
+            for (Cursor<V> cursor : cursors) {
+                cursor.close();
+            }
+        }
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Fri May 29 15:20:12 2009
@@ -35,7 +35,6 @@
     private CursoredQueue<E> queue;
     private final FlowController<E> controller;
     private final IFlowSizeLimiter<E> limiter;
-    private boolean started = true;
     private Cursor<E> cursor;
     private final QueueDescriptor queueDescriptor;
     private PersistencePolicy<E> persistencePolicy;
@@ -123,7 +122,7 @@
                 synchronized (ExclusivePersistentQueue.this) {
                     E elem = qe.getElement();
                     if (qe.delete()) {
-                        if (!qe.acquired) {
+                        if (!qe.isAcquired()) {
                             controller.elementDispatched(elem);
                         }
                     }
@@ -147,8 +146,9 @@
         FlowController<QueueElement<E>> memoryController = null;
         if (persistencePolicy.isPagingEnabled()) {
             IFlowSizeLimiter<QueueElement<E>> limiter = new SizeLimiter<QueueElement<E>>(persistencePolicy.getPagingInMemorySize(), persistencePolicy.getPagingInMemorySize() / 2) {
+                @Override
                 public int getElementSize(QueueElement<E> qe) {
-                    return qe.size;
+                    return qe.getLimiterSize();
                 };
             };
 
@@ -232,21 +232,25 @@
             throw new IllegalStateException("Not initialized");
         }
         if (!started) {
-            started = true;
-            if (isDispatchReady()) {
-                notifyReady();
-            }
+            super.start();
             queue.start();
         }
     }
 
     public synchronized void stop() {
         if (started) {
-            started = false;
+            super.stop();
             queue.stop();
         }
     }
 
+    public void shutdown(boolean sync) {
+        super.shutdown(sync);
+        synchronized (this) {
+            queue.shutdown(sync);
+        }
+    }
+
     public FlowController<E> getFlowController(Flow flow) {
         return controller;
     }
@@ -272,13 +276,15 @@
                 SubscriptionDeliveryCallback callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
 
                 // See if the sink has room:
+                qe.setAcquired(subscription);
                 if (subscription.offer(qe.elem, sourceController, callback)) {
-                    qe.setAcquired(true);
                     controller.elementDispatched(qe.getElement());
                     // If remove on dispatch acknowledge now:
                     if (callback == null) {
                         qe.acknowledge();
                     }
+                } else {
+                    qe.setAcquired(null);
                 }
             }
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Fri May 29 15:20:12 2009
@@ -27,7 +27,6 @@
 public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
     private final LinkedList<E> queue = new LinkedList<E>();
     private final FlowController<E> controller;
-    private boolean started = true;
 
     /**
      * Creates a flow queue that can handle multiple flows.
@@ -57,14 +56,6 @@
         }
     }
 
-    public synchronized void start() {
-        started = true;
-    }
-
-    public synchronized void stop() {
-        started = false;
-    }
-
     public FlowController<E> getFlowController(Flow flow) {
         return controller;
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Fri May 29 15:20:12 2009
@@ -21,7 +21,7 @@
 import org.apache.activemq.queue.QueueStore.PersistentQueue;
 import org.apache.activemq.util.Mapper;
 
-public interface IQueue<K, V> extends IFlowSink<V>, PersistentQueue<K,V>{
+public interface IQueue<K, V> extends IFlowSink<V>, PersistentQueue<K, V> {
 
     /**
      * @return the number of elements currently held by the queue.
@@ -51,14 +51,14 @@
     public boolean removeSubscription(Subscription<V> sub);
 
     /**
-     * Sets a mapper returning the expiration time for elements in this 
-     * queue. A positive value indicates that the message has an expiration
-     * time. 
+     * Sets a mapper returning the expiration time for elements in this queue. A
+     * positive value indicates that the message has an expiration time.
      * 
-     * @param expirationMapper The expiration mapper.
+     * @param expirationMapper
+     *            The expiration mapper.
      */
     public void setExpirationMapper(Mapper<Long, V> expirationMapper);
-    
+
     /**
      * Sets the dispatcher for the queue.
      * 
@@ -68,6 +68,18 @@
     public void setDispatcher(IDispatcher dispatcher);
 
     /**
+     * Sets the base dispatch priority for the queue. Setting to higher value
+     * will increase the preference with which the dispatcher dispatches the
+     * queue. If the queue itself is priority based, the queue may further
+     * increase it's dispatch priority based on the priority of elements that it
+     * holds.
+     * 
+     * @param priority
+     *            The base priority for the queue
+     */
+    public void setDispatchPriority(int priority);
+
+    /**
      * Starts the queue.
      */
     public void start();
@@ -81,4 +93,17 @@
      */
     public void stop();
 
+    /**
+     * The queue is stopped via {@link #stop()} then shutdown. Once shutdown an
+     * {@link IQueue} cannot be restarted. Attempts to manipulate the queue once
+     * the queue is shutdown will thrown an {@link IllegalStateException} unless
+     * otherwise documented.
+     * 
+     * @param sync
+     *            If true will cause the calling thread to block until all
+     *            resources held by the queue are cleaned up. Otherwise, the
+     *            queue shutdown will proceed asynchronously.
+     */
+    public void shutdown(boolean sync);
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri May 29 15:20:12 2009
@@ -21,6 +21,7 @@
 import java.util.HashSet;
 
 import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
@@ -35,7 +36,9 @@
     private QueueStore<K, V> store;
     protected IDispatcher dispatcher;
     private boolean started;
+    private boolean shutdown = false;
     protected QueueStore.QueueDescriptor queueDescriptor;
+    private int basePriority = 0;
 
     public PartitionedQueue(String name) {
         super(name);
@@ -51,6 +54,7 @@
     public IQueue<K, V> getPartition(int partitionKey) {
         boolean save = false;
         IQueue<K, V> rc = null;
+        checkShutdown();
         synchronized (partitions) {
             rc = partitions.get(partitionKey);
             if (rc == null) {
@@ -68,19 +72,38 @@
         return rc;
     }
 
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.queue.IQueue#setDispatchPriority(int)
+     */
+    public void setDispatchPriority(int priority) {
+        synchronized (this) {
+            if (basePriority != priority) {
+                basePriority = priority;
+                if (!shutdown) {
+                    for (IQueue<K, V> queue : partitions.values()) {
+                        queue.setDispatchPriority(basePriority);
+                    }
+                }
+            }
+        }
+    }
+
     public int getEnqueuedCount() {
+        checkShutdown();
         synchronized (partitions) {
+
             int count = 0;
             for (IQueue<K, V> queue : partitions.values()) {
-                if (queue != null) {
-                    count += queue.getEnqueuedCount();
-                }
+                count += queue.getEnqueuedCount();
             }
             return count;
         }
     }
 
     public synchronized long getEnqueuedSize() {
+        checkShutdown();
         synchronized (partitions) {
             long size = 0;
             for (IQueue<K, V> queue : partitions.values()) {
@@ -107,10 +130,12 @@
     abstract public IQueue<K, V> createPartition(int partitionKey);
 
     public void addPartition(int partitionKey, IQueue<K, V> queue) {
+        checkShutdown();
         synchronized (partitions) {
             partitions.put(partitionKey, queue);
             for (Subscription<V> sub : subscriptions) {
                 queue.addSubscription(sub);
+                queue.setDispatchPriority(basePriority);
             }
         }
     }
@@ -124,6 +149,7 @@
 
     public synchronized void start() {
         if (!started) {
+            checkShutdown();
             started = true;
             for (IQueue<K, V> partition : partitions.values()) {
                 if (partition != null)
@@ -142,7 +168,27 @@
         }
     }
 
+    public void shutdown(boolean sync) {
+        HashMap<Integer, IQueue<K, V>> partitions = null;
+        synchronized (this) {
+            if (!shutdown) {
+                shutdown = true;
+                started = false;
+            }
+            partitions = this.partitions;
+            this.partitions = null;
+        }
+
+        if (partitions != null) {
+            for (IQueue<K, V> partition : partitions.values()) {
+                if (partition != null)
+                    partition.shutdown(sync);
+            }
+        }
+    }
+
     public void addSubscription(Subscription<V> sub) {
+        checkShutdown();
         synchronized (partitions) {
             subscriptions.add(sub);
             Collection<IQueue<K, V>> values = partitions.values();
@@ -153,6 +199,7 @@
     }
 
     public boolean removeSubscription(Subscription<V> sub) {
+        checkShutdown();
         synchronized (partitions) {
             if (subscriptions.remove(sub)) {
                 Collection<IQueue<K, V>> values = partitions.values();
@@ -186,6 +233,7 @@
     }
 
     public void setDispatcher(IDispatcher dispatcher) {
+        checkShutdown();
         this.dispatcher = dispatcher;
         synchronized (partitions) {
             Collection<IQueue<K, V>> values = partitions.values();
@@ -194,4 +242,10 @@
             }
         }
     }
+
+    private void checkShutdown() {
+        if (shutdown) {
+            throw new IllegalStateException(this + " is shutdown");
+        }
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Fri May 29 15:20:12 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.queue;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 
 import org.apache.activemq.dispatch.IDispatcher;
@@ -41,6 +42,8 @@
     private boolean started;
     private QueueStore.QueueDescriptor queueDescriptor;
     private Mapper<Long, V> expirationMapper;
+    private int basePriority = 0;
+    private boolean shutdown = false;
 
     public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
         super(name);
@@ -56,6 +59,7 @@
 
     public synchronized void start() {
         if (!started) {
+            checkShutdown();
             started = true;
             for (SharedQueue<K, V> partition : partitions) {
                 if (partition != null)
@@ -74,7 +78,26 @@
         }
     }
 
+    public void shutdown(boolean sync) {
+        ArrayList<SharedQueue<K, V>> partitions = null;
+        synchronized (this) {
+            if (!shutdown) {
+                shutdown = true;
+                started = false;
+            }
+            partitions = this.partitions;
+        }
+
+        if (partitions != null) {
+            for (IQueue<K, V> partition : partitions) {
+                if (partition != null)
+                    partition.shutdown(sync);
+            }
+        }
+    }
+
     public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+        checkShutdown();
         // No-op, only partitions should have stored values.
         if (count > 0 || size > 0) {
             throw new IllegalArgumentException("Partioned queues do not themselves hold values");
@@ -94,6 +117,7 @@
     }
 
     public synchronized int getEnqueuedCount() {
+        checkShutdown();
         int count = 0;
         for (SharedQueue<K, V> queue : partitions) {
             if (queue != null) {
@@ -125,6 +149,7 @@
 
     public void addSubscription(Subscription<V> sub) {
         synchronized (this) {
+            checkShutdown();
             subscriptions.add(sub);
             for (SharedQueue<K, V> queue : partitions) {
                 if (queue != null) {
@@ -148,18 +173,41 @@
         return false;
     }
 
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.queue.IQueue#setDispatchPriority(int)
+     */
+    public void setDispatchPriority(int priority) {
+        synchronized (this) {
+            if (basePriority != priority) {
+                basePriority = priority;
+                if (shutdown) {
+                    return;
+                }
+                for (int i = 0; i < limiter.getPriorities(); i++) {
+                    SharedQueue<K, V> queue = partitions.get(i);
+                    if (queue != null) {
+                        queue.setDispatchPriority(basePriority + i);
+                    }
+                }
+            }
+        }
+    }
+
     public IQueue<K, V> createPartition(int prio) {
         return getPartition(prio, false);
     }
 
     private IQueue<K, V> getPartition(int prio, boolean initialize) {
         synchronized (this) {
+            checkShutdown();
             SharedQueue<K, V> queue = partitions.get(prio);
             if (queue == null) {
                 queue = new SharedQueue<K, V>(getResourceName() + "$" + prio, limiter.getPriorityLimter(prio), this);
                 queue.setAutoRelease(autoRelease);
                 queue.setDispatcher(dispatcher);
-                queue.setDispatchPriority(prio);
+                queue.setDispatchPriority(basePriority + prio);
                 queue.setKeyMapper(keyMapper);
                 queue.setStore(store);
                 queue.setPersistencePolicy(persistencePolicy);
@@ -172,8 +220,7 @@
                     queue.initialize(0, 0, 0, 0);
                     onFlowOpened(queue.getFlowControler());
                 }
-                
-                
+
                 if (started) {
                     queue.start();
                 }
@@ -213,4 +260,11 @@
         this.dispatcher = dispatcher;
         super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
     }
+
+    private void checkShutdown() {
+        if (shutdown) {
+            throw new IllegalStateException(this + " is shutdown");
+        }
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Fri May 29 15:20:12 2009
@@ -56,7 +56,6 @@
     // Limiter/Controller for the size of the queue:
     private FlowController<V> inputController;
     private final IFlowSizeLimiter<V> sizeLimiter;
-    private final boolean RELEASE_ON_ACQUISITION = true;
 
     private final QueueStore.QueueDescriptor queueDescriptor;
 
@@ -85,7 +84,6 @@
     private final LinkedNodeList<SubscriptionContext> trailingConsumers = new LinkedNodeList<SubscriptionContext>();
 
     private boolean initialized = false;
-    private boolean started = false;
 
     private Mapper<Long, V> expirationMapper;
 
@@ -149,7 +147,6 @@
 
                     @Override
                     protected int getElementSize(V elem) {
-                        // TODO Auto-generated method stub
                         return sizeLimiter.getElementSize(elem);
                     }
 
@@ -215,8 +212,9 @@
         FlowController<QueueElement<V>> controller = null;
         if (pageInElements && persistencePolicy.isPagingEnabled() && sizeLimiter.getCapacity() > persistencePolicy.getPagingInMemorySize()) {
             IFlowSizeLimiter<QueueElement<V>> limiter = new SizeLimiter<QueueElement<V>>(persistencePolicy.getPagingInMemorySize(), persistencePolicy.getPagingInMemorySize() / 2) {
+                @Override
                 public int getElementSize(QueueElement<V> qe) {
-                    return qe.size;
+                    return qe.getLimiterSize();
                 };
             };
 
@@ -264,11 +262,10 @@
 
     final void acknowledge(QueueElement<V> qe) {
         synchronized (mutex) {
-            V elem = qe.getElement();
-            if (qe.delete()) {
-                if (!qe.acquired || !RELEASE_ON_ACQUISITION) {
-                    inputController.elementDispatched(elem);
-                }
+            qe.delete();
+            //If the element wasn't acqired release space:
+            if (!qe.isAcquired()) {
+                sizeLimiter.remove(1, qe.getLimiterSize());
             }
         }
     }
@@ -307,10 +304,10 @@
         }
     }
 
-    public void shutdown() {
+    public void shutdown(boolean sync) {
+        super.shutdown(sync);
         synchronized (mutex) {
-            stop();
-            sharedCursor.deactivate();
+            queue.shutdown(sync);
         }
     }
 
@@ -333,10 +330,7 @@
     }
 
     public void flowElemAccepted(ISourceController<V> source, V elem) {
-        synchronized (mutex) {
-            // TODO should change flow controller to pass original source:
-            accepted(null, elem);
-        }
+        throw new UnsupportedOperationException("Flow Controller pass-through not supported");
     }
 
     private final void accepted(ISourceController<?> source, V elem) {
@@ -394,7 +388,7 @@
             // Process shared consumers:
             if (!sharedConsumers.isEmpty()) {
                 QueueElement<V> next = sharedCursor.getNext();
-                
+
                 if (next != null) {
 
                     // See if there are any interested consumers:
@@ -639,28 +633,32 @@
                 return ACCEPTED;
             }
 
-            // If the sub doesn't remove on dispatch set an ack listener:
+            // If the sub doesn't remove on dispatch pass it the callback
             SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
 
             // See if the sink has room:
+            qe.setAcquired(sub);
             if (sub.offer(qe.elem, this, callback)) {
                 if (!sub.isBrowser()) {
-                    qe.setAcquired(true);
-                    if (RELEASE_ON_ACQUISITION) {
-                        inputController.elementDispatched(qe.getElement());
-                    }
+
+                    sizeLimiter.remove(1, qe.getLimiterSize());
 
                     // If remove on dispatch acknowledge now:
                     if (callback == null) {
                         qe.acknowledge();
                     }
                 }
+                else
+                {
+                    qe.setAcquired(null);
+                }
 
                 // Advance our cursor:
                 cursor.skip(qe);
 
                 return ACCEPTED;
             } else {
+                qe.setAcquired(null);
                 // Remove from dispatch list until we are resumed:
                 if (DEBUG) {
                     System.out.println(this + " Declined: " + qe);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Fri May 29 15:20:12 2009
@@ -58,8 +58,6 @@
     protected Mapper<K, V> keyMapper;
     private long directs;
 
-    private boolean started = false;
-
     private final ISourceController<V> sourceControler = new ISourceController<V>() {
 
         public Flow getFlow() {
@@ -146,19 +144,6 @@
         }
     }
 
-    public synchronized void start() {
-        if (!started) {
-            started = true;
-            if (isDispatchReady()) {
-                super.notifyReady();
-            }
-        }
-    }
-
-    public synchronized void stop() {
-        started = false;
-    }
-
     public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
         // this queue is not persistent, so we can ignore this.
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Fri May 29 15:20:12 2009
@@ -68,6 +68,7 @@
     private static final boolean USE_KAHA_DB = true;
     private static final boolean PERSISTENT = false;
     private static final boolean PURGE_STORE = true;
+    private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
     protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -77,7 +78,11 @@
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
     protected IDispatcher createDispatcher() {
-        return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+        if (THREAD_POOL_SIZE > 1) {
+            return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, THREAD_POOL_SIZE);
+        } else {
+            return PriorityDispatcher.createPriorityDispatcher("TestDispatcher", MessageBroker.MAX_PRIORITY);
+        }
     }
 
     protected int consumerStartDelay = 0;
@@ -123,7 +128,7 @@
         consumerStartDelay = 0;
     }
 
-    public void testSharedQueue_1_1_1() throws Exception {
+    public void test1_1_1() throws Exception {
         startServices();
         try {
             createQueues(1);
@@ -136,7 +141,7 @@
         }
     }
 
-    public void testSharedQueue_10_10_10() throws Exception {
+    public void test10_10_10() throws Exception {
         startServices();
         try {
             createQueues(10);
@@ -149,7 +154,7 @@
         }
     }
 
-    public void testSharedQueue_10_1_10() throws Exception {
+    public void test10_1_10() throws Exception {
         startServices();
         try {
             createQueues(1);
@@ -162,7 +167,7 @@
         }
     }
 
-    public void testSharedQueue_10_1_1() throws Exception {
+    public void test10_1_1() throws Exception {
         startServices();
         try {
             createQueues(10);
@@ -175,7 +180,7 @@
         }
     }
 
-    public void testSharedQueue_1_1_10() throws Exception {
+    public void test1_1_10() throws Exception {
         startServices();
         try {
             createQueues(10);
@@ -237,6 +242,7 @@
     private final void createQueues(int count) {
         for (int i = 0; i < count; i++) {
             IQueue<Long, MessageDelivery> queue = queueStore.createSharedQueue("queue-" + (i + 1));
+            queue.setDispatchPriority(1);
             queues.add(queue);
         }
     }
@@ -403,6 +409,7 @@
             this.name = name;
             Flow flow = new Flow(name + "-outbound", false);
             limiter = new SizeLimiter<MessageDelivery>(1024 * 1024, 512 * 1024) {
+                @Override
                 public int getElementSize(MessageDelivery m) {
                     return m.getFlowLimiterSize();
                 }
@@ -456,6 +463,7 @@
         public boolean offer(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
             if (controller.offer(element, source)) {
                 addInternal(element, source, callback);
+                return true;
             }
             return false;
         }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Fri May 29 15:20:12 2009
@@ -45,6 +45,7 @@
         final Flow flow = new Flow("client-"+name+"-inbound", false);
         inputResumeThreshold = inputWindowSize/2;
         WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, inputResumeThreshold) {
+            @Override
             protected void sendCredit(int credit) {
                 MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);
                 write(ack);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=780014&r1=780013&r2=780014&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Fri May 29 15:20:12 2009
@@ -415,8 +415,8 @@
         }
 
         @Override
-        public void remove(long size) {
-            super.remove(size);
+        public void remove(int count, long size) {
+            super.remove(count, size);
             if (!clientMode) {
                 available += size;
                 if (available >= capacity - resumeThreshold) {
@@ -430,7 +430,7 @@
 
         public void onProtocolMessage(FlowControl m) {
             synchronized (outputQueue) {
-                remove(m.getCredit());
+                remove(1, m.getCredit());
             }
         }