You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/14 15:41:15 UTC

svn commit: r774764 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/act...

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Thu May 14 13:41:14 2009
@@ -69,11 +69,11 @@
     private static final int NO_MATCH = 1;
     private static final int DECLINED = 2;
 
-    private final SortedLinkedList<QueueElement> queue = new SortedLinkedList<QueueElement>();
+    private final SortedLinkedList<QueueElement<V>> queue = new SortedLinkedList<QueueElement<V>>();
     private Mapper<K, V> keyMapper;
 
     private final ElementLoader loader;
-    private Cursor sharedCursor;
+    private Cursor<V> sharedCursor;
     private QueueStore<K, V> store;
     private PersistencePolicy<V> persistencePolicy;
     private long nextSequenceNumber = 0;
@@ -93,11 +93,9 @@
     private final LinkedNodeList<SubscriptionContext> trailingConsumers = new LinkedNodeList<SubscriptionContext>();
 
     // Limiter/Controller for the size of the queue:
-    private final FlowController<V> sizeController;
+    private FlowController<V> inputController;
     private final IFlowSizeLimiter<V> sizeLimiter;
-
-    // Default cursor memory limit:
-    private static final long DEFAULT_MEMORY_LIMIT = 10;
+    private final boolean RELEASE_ON_ACQUISITION = true;
 
     private int totalQueueCount;
 
@@ -105,13 +103,13 @@
     private boolean started = false;
 
     private Mapper<Long, V> expirationMapper;
-    private final Expirator expirator = new Expirator();
+    private Expirator expirator;
 
     public SharedQueue(String name, IFlowSizeLimiter<V> limiter) {
         this(name, limiter, null);
     }
 
-    SharedQueue(String name, IFlowSizeLimiter<V> limiter, Object mutex) {
+    SharedQueue(String name, IFlowSizeLimiter<V> sizeLimiter, Object mutex) {
         super(name);
         this.mutex = mutex == null ? new Object() : mutex;
 
@@ -119,12 +117,7 @@
         queueDescriptor = new QueueStore.QueueDescriptor();
         queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
         queueDescriptor.setQueueType(QueueDescriptor.SHARED);
-        this.sizeLimiter = limiter;
-
-        this.sizeController = new FlowController<V>(getFlowControllableHook(), flow, limiter, this.mutex);
-        sizeController.useOverFlowQueue(false);
-        super.onFlowOpened(sizeController);
-
+        this.sizeLimiter = sizeLimiter;
         loader = new ElementLoader();
 
     }
@@ -152,7 +145,11 @@
                     persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
                 }
 
-                sharedCursor = new Cursor(queueDescriptor.getQueueName().toString(), true, true);
+                inputController = new FlowController<V>(null, flow, sizeLimiter, mutex);
+                inputController.useOverFlowQueue(false);
+                super.onFlowOpened(inputController);
+
+                sharedCursor = openCursor(getResourceName(), true, true);
 
                 // Initialize counts:
                 nextSequenceNumber = sequenceMax + 1;
@@ -160,7 +157,7 @@
                     sizeLimiter.add(count, size);
                     totalQueueCount = count;
                     // Add a paged out placeholder:
-                    QueueElement qe = new QueueElement(null, sequenceMin);
+                    QueueElement<V> qe = new QueueElement<V>(null, sequenceMin, this);
                     qe.loaded = false;
                     queue.add(qe);
                 }
@@ -177,6 +174,8 @@
                     };
                 }
 
+                expirator = new Expirator();
+
                 if (DEBUG)
                     System.out.println(this + "Initialized, first seq: " + sequenceMin + " next sequence: " + nextSequenceNumber);
             }
@@ -199,6 +198,65 @@
         }
     }
 
+    private final Cursor<V> openCursor(String name, boolean pageInElements, boolean skipAcquired) {
+
+        FlowController<QueueElement<V>> controller = null;
+        if (pageInElements && persistencePolicy.isPagingEnabled() && sizeLimiter.getCapacity() > persistencePolicy.getPagingInMemorySize()) {
+            IFlowSizeLimiter<QueueElement<V>> limiter = new SizeLimiter<QueueElement<V>>(persistencePolicy.getPagingInMemorySize(), persistencePolicy.getPagingInMemorySize() / 2) {
+                public int getElementSize(QueueElement<V> qe) {
+                    return qe.size;
+                };
+            };
+
+            controller = new FlowController<QueueElement<V>>(null, flow, limiter, mutex) {
+                @Override
+                public IFlowResource getFlowResource() {
+                    return SharedQueue.this;
+                }
+            };
+            controller.useOverFlowQueue(false);
+            controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+        }
+
+        return new Cursor<V>(queue, loader, name, skipAcquired, pageInElements, controller);
+    }
+
+    final int getElementSize(V elem) {
+        return sizeLimiter.getElementSize(elem);
+    }
+
+    final long getElementExpiration(V elem) {
+        return expirationMapper.map(elem);
+    }
+
+    final Expirator getExpirator() {
+        return expirator;
+    }
+
+    final QueueStore<K, V> getQueueStore() {
+        return store;
+    }
+
+    final ElementLoader getLoader() {
+        return loader;
+    }
+
+    final PersistencePolicy<V> getPersistencePolicy() {
+        return persistencePolicy;
+    }
+
+    final void acknowledge(QueueElement<V> qe) {
+        synchronized (mutex) {
+            V elem = qe.getElement();
+            if (qe.delete()) {
+                if (!qe.acquired || !RELEASE_ON_ACQUISITION) {
+                    inputController.elementDispatched(elem);
+                }
+                totalQueueCount--;
+            }
+        }
+    }
+
     /**
      * Starts this queue.
      */
@@ -211,6 +269,7 @@
             if (!started) {
                 started = true;
                 sharedCursor.activate();
+                loader.start();
                 expirator.start();
                 if (isDispatchReady()) {
                     notifyReady();
@@ -232,42 +291,102 @@
         stop();
     }
 
-    public void flowElemAccepted(ISourceController<V> source, V elem) {
+    public void add(V elem, ISourceController<?> source) {
+        synchronized (mutex) {
+            inputController.add(elem, source);
+            accepted(source, elem);
+        }
+    }
 
+    public boolean offer(V elem, ISourceController<?> source) {
         synchronized (mutex) {
 
-            if (!initialized) {
-                throw new IllegalStateException("Not able to use uninitialized queue: " + getResourceName());
+            if (inputController.offer(elem, source)) {
+                accepted(source, elem);
+                return true;
             }
+            return false;
+        }
+    }
 
-            // Create a new queue element with the next sequence number:
-            QueueElement qe = new QueueElement(elem, nextSequenceNumber++);
+    public void flowElemAccepted(ISourceController<V> source, V elem) {
+        synchronized (mutex) {
+            // TODO should change flow controller to pass original source:
+            accepted(null, elem);
+        }
+    }
 
-            // Save the element (note that it is important this be done after
-            // we've set the sequence number above)
-            if (persistencePolicy.isPersistent(elem)) {
-                // For now base decision on whether to delay flush on
-                // whether or not there are
-                // consumers ready.
-                boolean delayable = !sharedConsumers.isEmpty();
-                qe.save(source, delayable);
-            }
-
-            // Add it to our queue:
-            queue.add(qe);
-            totalQueueCount++;
-            // Check with the loader to see if it needs to be paged out:
-            loader.elementAdded(qe, source);
-            expirator.elementAdded(qe);
-
-            // Request dispatch for the newly enqueued element.
-            // TODO consider optimizing to do direct dispatch?
-            // It might be better if the dispatcher itself provided
-            // this for cases where the caller is on the same dispatcher
-            if (isDispatchReady()) {
-                notifyReady();
+    private final void accepted(ISourceController<?> source, V elem) {
+
+        if (!initialized) {
+            throw new IllegalStateException("Uninitialized queue: " + getResourceName());
+        }
+
+        // Create a new queue element with the next sequence number:
+        QueueElement<V> qe = new QueueElement<V>(elem, nextSequenceNumber++, this);
+
+        // Save the element (note that it is important this be done after
+        // we've set the sequence number above)
+        if (persistencePolicy.isPersistent(elem)) {
+            // For now base decision on whether to delay flush on
+            // whether or not there are
+            // consumers ready.
+            boolean delayable = !sharedConsumers.isEmpty();
+            qe.save(source, delayable);
+        }
+
+        // Add it to our queue:
+        queue.add(qe);
+        totalQueueCount++;
+        if (!persistencePolicy.isPagingEnabled()) {
+            qe.addHardRef();
+        }
+        // Check with the shared cursor to see if it is willing to
+        // absorb the element. If so that's good enough.
+        if (persistencePolicy.isPagingEnabled() && !sharedCursor.offer(qe, source)) {
+
+            // Otherwise check with any other open cursor to see if
+            // it can take the element:
+            Collection<Cursor<V>> active = loader.getActiveCursors(qe);
+
+            // If there are none, unload the element:
+            if (active == null) {
+                qe.unload(source);
+                return;
+            }
+
+            // See if a cursor is willing to hang on to the
+            // element:
+            boolean accepted = false;
+            for (Cursor<V> cursor : active) {
+                // Already checked the shared cursor above:
+                if (cursor == sharedCursor) {
+                    continue;
+                }
+
+                if (cursor.offer(qe, source)) {
+                    accepted = true;
+                    break;
+                }
+            }
+
+            // If no cursor accepted it, then page out the element:
+            // keeping the element loaded.
+            if (!accepted) {
+                qe.unload(source);
             }
         }
+
+        expirator.elementAdded(qe);
+
+        // Request dispatch for the newly enqueued element.
+        // TODO consider optimizing to do direct dispatch?
+        // It might be better if the dispatcher itself provided
+        // this for cases where the caller is on the same dispatcher
+        if (isDispatchReady()) {
+            notifyReady();
+            // while(pollingDispatch());
+        }
     }
 
     public boolean pollingDispatch() {
@@ -303,7 +422,7 @@
 
             // Process shared consumers:
             if (!sharedConsumers.isEmpty()) {
-                QueueElement next = sharedCursor.getNext();
+                QueueElement<V> next = sharedCursor.getNext();
                 if (next != null) {
 
                     // See if there are any interested consumers:
@@ -412,11 +531,11 @@
         boolean isStarted;
 
         // The consumer's cursor:
-        final Cursor cursor;
+        final Cursor<V> cursor;
 
         SubscriptionContext(Subscription<V> target) {
             this.sub = target;
-            this.cursor = new Cursor(target.toString(), !sub.isBrowser(), true);
+            this.cursor = openCursor(target.toString(), true, !sub.isBrowser());
             cursor.setCursorReadyListener(new CursorReadyListener() {
                 public void onElementReady() {
                     if (!isLinked()) {
@@ -468,7 +587,7 @@
                 return;
             }
 
-            QueueElement next = cursor.getNext();
+            QueueElement<V> next = cursor.getNext();
             // If the next element isn't yet available
             // then unlink this subscription
             if (next == null) {
@@ -546,7 +665,7 @@
             }
         }
 
-        public final int offer(QueueElement qe) {
+        public final int offer(QueueElement<V> qe) {
 
             // If we are already passed this element return NO_MATCH:
             if (cursor.getCurrentSequeunce() > qe.sequence) {
@@ -561,7 +680,7 @@
 
             // Check for expiration:
             if (qe.isExpired()) {
-                qe.acknowledge();
+                acknowledge(qe);
                 return ACCEPTED;
             }
 
@@ -571,7 +690,10 @@
             // See if the sink has room:
             if (sub.offer(qe.elem, this, callback)) {
                 if (!sub.isBrowser()) {
-                    qe.setAcquired(this);
+                    qe.setAcquired(true);
+                    if (RELEASE_ON_ACQUISITION) {
+                        inputController.elementDispatched(qe.getElement());
+                    }
 
                     // If remove on dispatch acknowledge now:
                     if (callback == null) {
@@ -632,16 +754,19 @@
         public void onElementReady();
     }
 
-    class Cursor implements Comparable<Cursor> {
+    static class Cursor<V> implements Comparable<Cursor<V>> {
 
         private CursorReadyListener readyListener;
 
         private final String name;
+        private final SharedQueue<?, V>.ElementLoader loader;
+        private final SortedLinkedList<QueueElement<V>> queue;
+
         private boolean activated = false;;
 
         // The next element for this cursor, always non null
         // if activated, unless no element available:
-        QueueElement current = null;
+        QueueElement<V> current = null;
         // The current sequence number for this cursor,
         // used when inactive or pointing to an element
         // sequence number beyond the queue's limit.
@@ -649,8 +774,8 @@
 
         // The cursor is holding references for all
         // elements between first and last inclusive:
-        QueueElement firstRef = null;
-        QueueElement lastRef = null;
+        QueueElement<V> firstRef = null;
+        QueueElement<V> lastRef = null;
         // This is set to the last block that for which
         // we have requested a load:
         long lastBlockRequest = -1;
@@ -659,38 +784,30 @@
         // When the limiter is set the cursor is able to
         // keep as many elements in memory as its limiter
         // allows.
-        private final IFlowSizeLimiter<QueueElement> memoryLimiter;
-        private final IFlowController<QueueElement> memoryController;
+        private final IFlowController<QueueElement<V>> memoryController;
 
         // Indicates whether this cursor skips acquired elements
         private final boolean skipAcquired;
         // Indicates whether this cursor will page in elements
-        // 
         private final boolean pageInElements;
 
-        public Cursor(String name, boolean skipAcquired, boolean pageInElements) {
+        private long limit = Long.MAX_VALUE;
+
+        public Cursor(SortedLinkedList<QueueElement<V>> queue, SharedQueue<?, V>.ElementLoader loader, String name, boolean skipAcquired, boolean pageInElements,
+                IFlowController<QueueElement<V>> memoryController) {
             this.name = name;
+            this.queue = queue;
+            this.loader = loader;
+
             this.skipAcquired = skipAcquired;
             this.pageInElements = pageInElements;
 
             // Set up a limiter if this cursor pages in elements, and memory
             // limit is less than the queue size:
-            if (pageInElements && persistencePolicy.isPagingEnabled() && DEFAULT_MEMORY_LIMIT < sizeLimiter.getCapacity()) {
-                memoryLimiter = new SizeLimiter<QueueElement>(DEFAULT_MEMORY_LIMIT, DEFAULT_MEMORY_LIMIT) {
-                    public int getElementSize(QueueElement qe) {
-                        return qe.size;
-                    };
-                };
-
-                memoryController = new FlowController<QueueElement>(null, flow, memoryLimiter, mutex) {
-                    @Override
-                    public IFlowResource getFlowResource() {
-                        return SharedQueue.this;
-                    }
-                };
+            if (pageInElements) {
+                this.memoryController = memoryController;
             } else {
-                memoryLimiter = null;
-                memoryController = null;
+                this.memoryController = null;
             }
         }
 
@@ -702,8 +819,8 @@
          *            The element for which to check.
          * @return
          */
-        public final boolean offer(QueueElement qe, ISourceController<?> controller) {
-            if (activated && memoryLimiter != null) {
+        public final boolean offer(QueueElement<V> qe, ISourceController<?> controller) {
+            if (activated && memoryController != null) {
                 getNext();
                 if (lastRef != null) {
                     // Return true if we absorbed it:
@@ -712,7 +829,11 @@
                     }
                     // If our last ref is close to this one reserve the element
                     else if (qe.getPrevious() == lastRef) {
-                        return addCursorRef(qe, controller);
+                        if (addCursorRef(qe, controller)) {
+                            return true;
+                        } else {
+                            return false;
+                        }
                     }
                 }
                 return false;
@@ -724,7 +845,7 @@
 
         public final void reset(long sequence) {
             updateSequence(sequence);
-            current = null;
+            updateCurrent(null);
         }
 
         public final void activate() {
@@ -746,7 +867,8 @@
 
                     // If we're passing into a new block release the old one:
                     if (firstRef.isLastInBlock()) {
-                        System.out.println(this + " releasing block:" + firstRef.restoreBlock);
+                        if (DEBUG)
+                            System.out.println(this + " releasing block:" + firstRef.restoreBlock);
                         loader.releaseBlock(this, firstRef.restoreBlock);
                     }
 
@@ -758,31 +880,47 @@
                 }
 
                 // Release the last requested block:
-                if (persistencePolicy.isPageOutPlaceHolders()) {
+                if (loader.isPageOutPlaceHolders() && lastBlockRequest >= 0) {
                     loader.releaseBlock(this, lastBlockRequest);
                 }
 
                 lastBlockRequest = -1;
 
-                // Let go of our current ref:
-                current = null;
+                updateCurrent(null);
                 activated = false;
             }
         }
 
         /**
+         * Updates the current ref. We keep a soft ref to the current to keep it
+         * in the queue so that we can get at the next without a costly lookup.
+         */
+        private final void updateCurrent(QueueElement<V> qe) {
+            if (qe == current) {
+                return;
+            }
+            if (current != null) {
+                current.releaseSoftRef();
+            }
+            current = qe;
+            if (current != null) {
+                current.addSoftRef();
+            }
+        }
+
+        /**
          * Makes sure elements are paged in
          */
         private final void updatePagingRefs() {
             if (!activated)
                 return;
 
-            if (pageInElements && memoryLimiter != null) {
+            if (pageInElements && memoryController != null) {
 
                 // Release memory references up to our sequence number
                 while (firstRef != null && firstRef.getSequence() < sequence) {
                     boolean lastInBlock = firstRef.isLastInBlock();
-                    QueueElement next = firstRef.getNext();
+                    QueueElement<V> next = firstRef.getNext();
                     firstRef.releaseHardRef(memoryController);
 
                     // If we're passing into a new block release the old one:
@@ -802,14 +940,14 @@
                 }
 
                 // Now add refs for as many elements as we can hold:
-                QueueElement next = null;
+                QueueElement<V> next = null;
                 if (lastRef == null) {
                     next = current;
                 } else {
                     next = lastRef.getNext();
                 }
 
-                while (next != null && !memoryLimiter.getThrottled()) {
+                while (next != null && !memoryController.isSinkBlocked()) {
                     if (!addCursorRef(next, null)) {
                         break;
                     }
@@ -819,11 +957,11 @@
             // Otherwise we still need to ensure the block has been loaded:
             else if (current != null && !current.isLoaded()) {
                 if (lastBlockRequest != current.restoreBlock) {
-                    if (persistencePolicy.isPageOutPlaceHolders()) {
+                    if (lastBlockRequest != -1) {
                         loader.releaseBlock(this, lastBlockRequest);
                     }
-                    loader.loadBlock(this, current.restoreBlock);
                     lastBlockRequest = current.restoreBlock;
+                    loader.reserveBlock(this, lastBlockRequest);
                 }
             }
         }
@@ -839,13 +977,13 @@
          *            The controller adding the element.
          * @return false if the element isn't in memory.
          */
-        private final boolean addCursorRef(QueueElement qe, ISourceController<?> controller) {
+        private final boolean addCursorRef(QueueElement<V> qe, ISourceController<?> controller) {
             // Make sure we have requested the block:
             if (qe.restoreBlock != lastBlockRequest) {
                 lastBlockRequest = qe.restoreBlock;
                 if (DEBUG)
-                    System.out.println(this + " requesting block:" + lastBlockRequest);
-                loader.loadBlock(this, lastBlockRequest);
+                    System.out.println(this + " requesting block:" + lastBlockRequest + " for" + qe);
+                loader.reserveBlock(this, lastBlockRequest);
             }
 
             // If the next element isn't loaded then we can't yet
@@ -865,25 +1003,22 @@
 
         private final void updateSequence(final long newSequence) {
             this.sequence = newSequence;
-            if (DEBUG && sequence > nextSequenceNumber) {
-                new Exception(this + "cursor overflow").printStackTrace();
-            }
         }
 
         /**
          * Sets the cursor to the next sequence number after the provided
          * element:
          */
-        public final void skip(QueueElement elem) {
-            QueueElement next = elem.isLinked() ? elem.getNext() : null;
+        public final void skip(QueueElement<V> elem) {
+            QueueElement<V> next = elem.isLinked() ? elem.getNext() : null;
 
             if (next != null) {
                 updateSequence(next.sequence);
                 if (activated) {
-                    current = next;
+                    updateCurrent(next);
                 }
             } else {
-                current = null;
+                updateCurrent(null);
                 updateSequence(sequence + 1);
             }
             updatePagingRefs();
@@ -893,21 +1028,22 @@
          * @return the next available element or null if one is not currently
          *         available.
          */
-        public final QueueElement getNext() {
+        public final QueueElement<V> getNext() {
 
             try {
                 if (queue.isEmpty() || queue.getTail().sequence < sequence) {
-                    current = null;
+                    updateCurrent(null);
                     return null;
                 }
 
                 if (queue.getTail().sequence == sequence) {
-                    current = queue.getTail();
+                    updateCurrent(queue.getTail());
                 }
 
-                // Get a pointer to the next element
-                if (current == null || !current.isLinked()) {
-                    current = queue.upper(sequence, true);
+                // If we don't have a current, then look it up based
+                // on our sequence:
+                if (current == null) {
+                    updateCurrent(queue.upper(sequence, true));
                     if (current == null) {
                         return null;
                     }
@@ -915,8 +1051,8 @@
 
                 // Skip removed elements (and acquired ones if requested)
                 while ((skipAcquired && current.isAcquired()) || current.isDeleted()) {
-                    QueueElement last = current;
-                    current = current.getNext();
+                    QueueElement<V> last = current;
+                    updateCurrent(current.getNext());
 
                     // If the next element is null, increment our sequence
                     // and return:
@@ -946,7 +1082,7 @@
             } finally {
                 // Don't hold on to a current ref if we aren't activated:
                 if (!activated) {
-                    current = null;
+                    updateCurrent(null);
                 }
                 updatePagingRefs();
             }
@@ -968,7 +1104,7 @@
             return sequence;
         }
 
-        public int compareTo(Cursor o) {
+        public int compareTo(Cursor<V> o) {
             if (o.sequence > sequence) {
                 return -1;
             } else if (sequence > o.sequence) {
@@ -997,7 +1133,9 @@
          */
         public void onElementsLoaded() {
             if (readyListener != null && isReady()) {
-                System.out.println(this + " notifying ready");
+                if (DEBUG) {
+                    System.out.println(this + " notifying ready");
+                }
                 readyListener.onElementReady();
             }
         }
@@ -1013,12 +1151,15 @@
          * @return true if the cursor has passed the end of the queue.
          */
         public boolean atEnd() {
-            // TODO Auto-generated method stub
             if (queue.isEmpty()) {
                 return true;
             }
 
-            QueueElement tail = queue.getTail();
+            if (sequence > limit) {
+                return true;
+            }
+
+            QueueElement<V> tail = queue.getTail();
             // Can't be at the end if the tail isn't loaded:
             if (!tail.isLoaded()) {
                 return false;
@@ -1034,12 +1175,20 @@
         public String toString() {
             return "Cursor: " + sequence + " [" + name + "]";
         }
+
+        /**
+         * @param l
+         */
+        public void setLimit(long l) {
+            limit = l;
+        }
     }
 
-    class QueueElement extends SortedLinkedListNode<QueueElement> implements SubscriptionDeliveryCallback, SaveableQueueElement<V> {
+    static class QueueElement<V> extends SortedLinkedListNode<QueueElement<V>> implements SubscriptionDeliveryCallback, SaveableQueueElement<V> {
 
         final long sequence;
         final long restoreBlock;
+        final SharedQueue<?, V> queue;
 
         V elem;
         int size = -1;
@@ -1063,14 +1212,14 @@
         boolean saved = false;
 
         boolean deleted = false;
-        SubscriptionContext owner;
+        boolean acquired = false;
 
-        public QueueElement(V elem, long sequence) {
+        public QueueElement(V elem, long sequence, SharedQueue<?, V> queue) {
             this.elem = elem;
-
+            this.queue = queue;
             if (elem != null) {
-                size = sizeLimiter.getElementSize(elem);
-                expiration = expirationMapper.map(elem);
+                size = queue.getElementSize(elem);
+                expiration = queue.getElementExpiration(elem);
             }
             this.sequence = sequence;
             this.restoreBlock = sequence / RESTORE_BLOCK_SIZE;
@@ -1083,8 +1232,8 @@
             return deleted;
         }
 
-        public QueueElement(RestoredElement<V> restored) throws Exception {
-            this(restored.getElement(), restored.getSequenceNumber());
+        public QueueElement(RestoredElement<V> restored, SharedQueue<?, V> queue) throws Exception {
+            this(restored.getElement(), restored.getSequenceNumber(), queue);
             this.size = restored.getElementSize();
             this.expiration = restored.getExpiration();
             saved = true;
@@ -1103,12 +1252,12 @@
                 // If this is the first request for this
                 // element request a load:
                 if (hardRefs == 1) {
-                    loader.pageIn(this);
+                    queue.getLoader().pageIn(this);
                 }
             }
         }
 
-        public final void releaseHardRef(IFlowController<QueueElement> controller) {
+        public final void releaseHardRef(IFlowController<QueueElement<V>> controller) {
             hardRefs--;
             if (hardRefs == 0) {
                 unload(controller);
@@ -1131,35 +1280,33 @@
             assert softRefs >= 0;
         }
 
-        public final void setAcquired(SubscriptionContext owner) {
-            this.owner = owner;
+        public final void setAcquired(boolean val) {
+            this.acquired = val;
         }
 
         public final void acknowledge() {
-            synchronized (mutex) {
-                delete();
-            }
+            queue.acknowledge(this);
         }
 
-        public final void delete() {
+        public final boolean delete() {
             if (!deleted) {
                 deleted = true;
-                owner = null;
-                totalQueueCount--;
                 if (isExpirable()) {
-                    expirator.elementRemoved(this);
+                    queue.getExpirator().elementRemoved(this);
                 }
-                sizeController.elementDispatched(elem);
+
                 if (saved) {
-                    store.deleteQueueElement(queueDescriptor, elem);
+                    queue.getQueueStore().deleteQueueElement(queue.getDescriptor(), elem);
                 }
                 elem = null;
                 unload(null);
+                return true;
             }
+            return false;
         }
 
         public final void unacquire(ISourceController<?> source) {
-            owner = null;
+            acquired = false;
             if (isExpired()) {
                 acknowledge();
             } else {
@@ -1174,38 +1321,38 @@
          */
         public final void unload(ISourceController<?> controller) {
 
-            // Can't unlink if there is a cursor ref, the cursor
-            // needs this element to decrement it's limiter space
-            if (hardRefs > 0) {
+            // Don't page out of there is a hard ref to the element
+            // or if it is acquired (since we need the element
+            // during delete:
+            if (!deleted && (hardRefs > 0 || acquired)) {
                 return;
             }
 
             // If the element didn't require persistence on enqueue, then
             // we'll need to save it now before paging it out.
-            // Note that we don't page out the element if it has an owner
-            // because we need the element when we issue the delete.
-            if (owner == null && elem != null && !persistencePolicy.isPersistent(elem)) {
-                save(controller, true);
-                if (DEBUG)
-                    System.out.println("Paged out element: " + this);
-                elem = null;
-            }
+            if (elem != null) {
+                if (!deleted) {
+                    if (!queue.getPersistencePolicy().isPersistent(elem)) {
+                        save(controller, true);
+                        if (DEBUG)
+                            System.out.println("Paged out element: " + this);
+                    }
 
-            // If save is pending don't unload until the save has completed
-            if (savePending) {
-                return;
+                    // If save is pending don't unload until the save has
+                    // completed
+                    if (savePending) {
+                        return;
+                    }
+                }
+
+                elem = null;
             }
 
-            QueueElement next = getNext();
-            QueueElement prev = getPrevious();
+            QueueElement<V> next = getNext();
+            QueueElement<V> prev = getPrevious();
 
-            // See if we can unload this element:
-            // Don't unload the element if it is:
-            // -Has an owner (we keep the element in memory so we don't
-            // forget about the owner).
-            // -If there are soft references to it
-            // -Or it is in the load queue
-            if (owner == null && softRefs == 0 && !loader.inLoadQueue(this)) {
+            // See if we can unload this element, don't unload if we have a soft
+            if (softRefs == 0) {
                 // If deleted unlink this element from the queue, and link
                 // together adjacent paged out entries:
                 if (deleted) {
@@ -1215,7 +1362,10 @@
                     if (next != null && prev != null && !next.isLoaded() && !prev.isLoaded()) {
                         next.unlink();
                     }
-                } else {
+                }
+                // Otherwise as long as the element isn't acquired we can unload
+                // it. If it is acquired we keep the soft ref around
+                else if (!acquired && queue.getPersistencePolicy().isPageOutPlaceHolders()) {
 
                     loaded = false;
 
@@ -1230,6 +1380,8 @@
                     if (prev != null && !prev.isLoaded()) {
                         unlink();
                     }
+                } else {
+                    return;
                 }
             }
 
@@ -1246,9 +1398,9 @@
          * @throws Exception
          *             If there was an error creating the loaded element:
          */
-        public final QueueElement loadAfter(RestoredElement<V> re) throws Exception {
+        public final QueueElement<V> loadAfter(RestoredElement<V> re) throws Exception {
 
-            QueueElement ret = null;
+            QueueElement<V> ret = null;
 
             // See if this element represents the one being loaded:
             if (sequence == re.getSequenceNumber()) {
@@ -1263,14 +1415,15 @@
                     if (re.getNextSequenceNumber() != -1) {
                         // Otherwise if our next pointer doesn't match the
                         // next restored number:
-                        QueueElement next = getNext();
+                        QueueElement<V> next = getNext();
                         if (next == null || next.sequence != re.getNextSequenceNumber()) {
-                            next = new QueueElement(null, re.getNextSequenceNumber());
+                            next = new QueueElement<V>(null, re.getNextSequenceNumber(), queue);
                             next.loaded = false;
                             this.linkAfter(next);
                         }
                     }
                     this.size = re.getElementSize();
+                    this.expiration = re.getExpiration();
                 }
 
                 // If we're paged out set our elem to the restored one:
@@ -1281,9 +1434,9 @@
                 savePending = false;
 
             } else {
-                ret = new QueueElement(re);
+                ret = new QueueElement<V>(re, queue);
                 // Otherwise simply link this element into the list:
-                queue.add(ret);
+                queue.queue.add(ret);
             }
 
             if (DEBUG)
@@ -1301,7 +1454,7 @@
 
         public final boolean isLastInBlock() {
             if (isTailNode()) {
-                return nextSequenceNumber / RESTORE_BLOCK_SIZE != restoreBlock;
+                return queue.nextSequenceNumber / RESTORE_BLOCK_SIZE != restoreBlock;
             } else {
                 return next.restoreBlock != restoreBlock;
             }
@@ -1316,7 +1469,7 @@
         }
 
         public final boolean isAcquired() {
-            return owner != null || deleted;
+            return acquired || deleted;
         }
 
         public final long getExpiration() {
@@ -1333,14 +1486,14 @@
 
         public final void save(ISourceController<?> controller, boolean delayable) {
             if (!saved) {
-                store.persistQueueElement(this, controller, delayable);
+                queue.getQueueStore().persistQueueElement(this, controller, delayable);
                 saved = true;
 
                 // If paging is enabled we can't unload the element until it
                 // is saved, otherwise there is no guarantee that it will be
                 // in the store on a subsequent load requests because the
                 // save is done asynchronously.
-                if (persistencePolicy.isPagingEnabled()) {
+                if (queue.getPersistencePolicy().isPagingEnabled()) {
                     savePending = true;
                 }
             }
@@ -1375,7 +1528,8 @@
          * ()
          */
         public void notifySave() {
-            synchronized (mutex) {
+            // TODO Refactor this:
+            synchronized (queue.mutex) {
                 // Unload if we haven't already:
                 if (isLinked()) {
                     savePending = false;
@@ -1402,18 +1556,18 @@
          * getQueueDescriptor()
          */
         public QueueDescriptor getQueueDescriptor() {
-            return queueDescriptor;
+            return queue.getDescriptor();
         }
 
         public String toString() {
-            return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
+            return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " aquired: " + acquired;
         }
 
     }
 
     private class Expirator {
 
-        private final Cursor cursor = new Cursor("Expirator", false, false);
+        private final Cursor<V> cursor = openCursor("Expirator-" + getResourceName(), false, false);
         // Number of expirable elements in the queue:
         private int count = 0;
 
@@ -1423,13 +1577,13 @@
 
         private static final int MAX_CACHE_SIZE = 500;
         private long uncachedMin = Long.MAX_VALUE;
-        TreeMap<Long, HashSet<QueueElement>> expirationCache = new TreeMap<Long, HashSet<QueueElement>>();
+        TreeMap<Long, HashSet<QueueElement<V>>> expirationCache = new TreeMap<Long, HashSet<QueueElement<V>>>();
         private int cacheSize = 0;
 
         public final boolean needsDispatch() {
             // If we have expiration candidates or are scanning the
             // queue request dispatch:
-            return hasExpirationCandidates() || cursor.isReady();
+            return hasExpirables() || cursor.isReady();
         }
 
         public void start() {
@@ -1453,7 +1607,9 @@
         }
 
         public void dispatch() {
-
+            if (!needsDispatch()) {
+                return;
+            }
             long now = -1;
             // If their are uncached elements in the queue that are ready for
             // expiration
@@ -1465,7 +1621,7 @@
 
             // Scan the queue looking for expirables:
             if (cursor.isReady()) {
-                QueueElement qe = cursor.getNext();
+                QueueElement<V> qe = cursor.getNext();
                 while (qe != null) {
                     if (!loaded) {
                         if (qe.sequence < recoverySequence) {
@@ -1485,7 +1641,8 @@
 
                 // Finished loading:
                 if (!loaded && cursor.getCurrentSequeunce() >= recoverySequence) {
-                    System.out.println(this + " Queue Load Complete");
+                    if (DEBUG)
+                        System.out.println(this + " Queue Load Complete");
                     loaded = true;
                     cursor.deactivate();
                 } else if (cursor.atEnd()) {
@@ -1493,15 +1650,15 @@
                 }
             }
 
-            if (now == -1) {
+            if (now == -1 && !expirationCache.isEmpty()) {
                 now = System.currentTimeMillis();
             }
 
             // 
             while (!expirationCache.isEmpty()) {
-                Entry<Long, HashSet<QueueElement>> first = expirationCache.firstEntry();
+                Entry<Long, HashSet<QueueElement<V>>> first = expirationCache.firstEntry();
                 if (first.getKey() < now) {
-                    for (QueueElement qe : first.getValue()) {
+                    for (QueueElement<V> qe : first.getValue()) {
                         qe.releaseSoftRef();
                         qe.acknowledge();
                     }
@@ -1509,7 +1666,7 @@
             }
         }
 
-        public void elementAdded(QueueElement qe) {
+        public void elementAdded(QueueElement<V> qe) {
             if (qe.isExpirable() && !qe.isDeleted()) {
                 count++;
                 if (qe.isExpired()) {
@@ -1520,10 +1677,10 @@
             }
         }
 
-        private void addToCache(QueueElement qe) {
+        private void addToCache(QueueElement<V> qe) {
             // See if we should cache it, evicting entries if possible
             if (cacheSize >= MAX_CACHE_SIZE) {
-                Entry<Long, HashSet<QueueElement>> last = expirationCache.lastEntry();
+                Entry<Long, HashSet<QueueElement<V>>> last = expirationCache.lastEntry();
                 if (last.getKey() <= qe.expiration) {
                     // Keep track of the minimum uncached value:
                     if (qe.expiration < uncachedMin) {
@@ -1533,7 +1690,7 @@
                 }
 
                 // Evict the entry:
-                Iterator<QueueElement> i = last.getValue().iterator();
+                Iterator<QueueElement<V>> i = last.getValue().iterator();
                 removeFromCache(i.next());
 
                 if (last.getKey() <= uncachedMin) {
@@ -1542,19 +1699,19 @@
                 }
             }
 
-            HashSet<QueueElement> entry = new HashSet<QueueElement>();
+            HashSet<QueueElement<V>> entry = new HashSet<QueueElement<V>>();
             entry.add(qe);
             qe.addSoftRef();
             cacheSize++;
-            HashSet<QueueElement> old = expirationCache.put(qe.expiration, entry);
+            HashSet<QueueElement<V>> old = expirationCache.put(qe.expiration, entry);
             if (old != null) {
                 old.add(qe);
                 expirationCache.put(qe.expiration, old);
             }
         }
 
-        private final void removeFromCache(QueueElement qe) {
-            HashSet<QueueElement> last = expirationCache.get(qe.expiration);
+        private final void removeFromCache(QueueElement<V> qe) {
+            HashSet<QueueElement<V>> last = expirationCache.get(qe.expiration);
             if (last != null && last.remove(qe.getSequenceNumber())) {
                 cacheSize--;
                 qe.releaseSoftRef();
@@ -1564,7 +1721,7 @@
             }
         }
 
-        public void elementRemoved(QueueElement qe) {
+        public void elementRemoved(QueueElement<V> qe) {
             // While loading, ignore elements that we haven't been seen yet.
             if (!loaded && qe.sequence < recoverySequence && qe.sequence > lastRecoverdSequence) {
                 return;
@@ -1577,11 +1734,7 @@
             }
         }
 
-        public boolean hasExpirationCandidates() {
-            return !loaded || hasExpirables();
-        }
-
-        public boolean hasExpirables() {
+        public final boolean hasExpirables() {
             if (count == 0) {
                 return false;
             } else {
@@ -1617,85 +1770,60 @@
     private class ElementLoader implements RestoreListener<V> {
 
         private LinkedList<QueueStore.RestoredElement<V>> fromDatabase = new LinkedList<QueueStore.RestoredElement<V>>();
-        private final HashMap<Long, HashSet<Cursor>> requestedBlocks = new HashMap<Long, HashSet<Cursor>>();
-        private final HashSet<Cursor> pagingCursors = new HashSet<Cursor>();
+        private final HashMap<Long, HashSet<Cursor<V>>> requestedBlocks = new HashMap<Long, HashSet<Cursor<V>>>();
+        private final HashSet<Cursor<V>> pagingCursors = new HashSet<Cursor<V>>();
 
-        public boolean inLoadQueue(QueueElement queueElement) {
+        private boolean loadOnRequest = false;
+        private Cursor<V> recoveryCursor = null;
+
+        public final void start() {
+
+            // If paging is enabled and we don't keep placeholders in memory
+            // then we load on
+            // request.
+            if (persistencePolicy.isPagingEnabled() && persistencePolicy.isPageOutPlaceHolders()) {
+                loadOnRequest = true;
+            } else {
+                loadOnRequest = false;
+                if (getEnqueuedCount() > 0) {
+                    recoveryCursor = openCursor("Loader", false, false);
+                    recoveryCursor.setLimit(nextSequenceNumber - 1);
+                    recoveryCursor.activate();
+                }
+            }
+        }
+
+        public boolean inLoadQueue(QueueElement<V> queueElement) {
             return requestedBlocks.containsKey(queueElement.restoreBlock);
         }
 
+        public final boolean isPageOutPlaceHolders() {
+            return persistencePolicy.isPageOutPlaceHolders();
+        }
+
         /**
          * @param queueElement
          */
-        public void pageIn(QueueElement qe) {
+        public void pageIn(QueueElement<V> qe) {
             store.restoreQueueElements(queueDescriptor, false, qe.sequence, qe.sequence, 1, this);
         }
 
-        /**
-         * Must be called after an element is added to the queue to enforce
-         * memory limits
-         * 
-         * @param elem
-         *            The added element:
-         * @param source
-         *            The source of the message
-         */
-        public final void elementAdded(QueueElement qe, ISourceController<V> source) {
-
-            if (persistencePolicy.isPagingEnabled()) {
-
-                // Check with the shared cursor to see if it is willing to
-                // absorb the element. If so that's good enough.
-                if (sharedCursor.offer(qe, source)) {
-                    return;
-                }
-
-                // Otherwise check with any other open cursor to see if
-                // it can take the element:
-                HashSet<Cursor> active = requestedBlocks.get(qe.sequence);
-
-                // If there are none, unload the element:
-                if (active == null) {
-                    qe.unload(source);
-                    return;
-                }
-
-                // See if a cursor is willing to hang on to the
-                // element:
-                boolean accepted = false;
-                for (Cursor cursor : active) {
-                    // Already checked the shared cursor above:
-                    if (cursor == sharedCursor) {
-                        continue;
-                    }
-
-                    if (cursor.offer(qe, source)) {
-                        accepted = true;
-                    }
-                }
-
-                // If no cursor accepted it, then page out the element:
-                // keeping the element loaded.
-                if (!accepted) {
-                    qe.unload(source);
-                }
-            }
+        public final Collection<Cursor<V>> getActiveCursors(QueueElement<V> qe) {
+            return requestedBlocks.get(qe.getSequence());
         }
 
-        // Updates memory when an element is loaded from the database:
-        private final void elementLoaded(QueueElement qe) {
-            // TODO track the rate of loaded elements vs those that
-            // are added to the queue. We'll want to throttle back
-            // enqueueing sources to a rate less than the restore
-            // rate so we can stay out of the store.
-        }
+        public void reserveBlock(Cursor<V> cursor, long block) {
+            HashSet<Cursor<V>> cursors = requestedBlocks.get(block);
+            boolean load = recoveryCursor != null && cursor == recoveryCursor;
 
-        public void loadBlock(Cursor cursor, long block) {
-            HashSet<Cursor> cursors = requestedBlocks.get(block);
             if (cursors == null) {
-                cursors = new HashSet<Cursor>();
+                cursors = new HashSet<Cursor<V>>();
                 requestedBlocks.put(block, cursors);
+                load |= loadOnRequest;
+            }
+            cursors.add(cursor);
 
+            if (load) {
                 // Max sequence number is the end of this restoreBlock:
                 long firstSequence = block * RESTORE_BLOCK_SIZE;
                 long maxSequence = block * RESTORE_BLOCK_SIZE + RESTORE_BLOCK_SIZE - 1;
@@ -1706,21 +1834,17 @@
                 if (DEBUG)
                     System.out.println(cursor + " requesting restoreBlock:" + block + " from " + firstSequence + " to " + maxSequence + " max: " + maxCount + " queueMax: " + nextSequenceNumber);
 
-                // If paging is enabled only pull in queue records, don't bring
+                // If paging is enabled only pull in queue records, don't
+                // bring
                 // in the payload.
                 // Each active cursor will have to pull in messages based on
                 // available memory.
                 store.restoreQueueElements(queueDescriptor, persistencePolicy.isPagingEnabled(), firstSequence, maxSequence, maxCount, this);
             }
-            cursors.add(cursor);
         }
 
-        public void releaseBlock(Cursor cursor, long block) {
-            // Don't do anything if we don't page out placeholders
-            if (!persistencePolicy.isPageOutPlaceHolders()) {
-                return;
-            }
-            HashSet<Cursor> cursors = requestedBlocks.get(block);
+        public void releaseBlock(Cursor<V> cursor, long block) {
+            HashSet<Cursor<V>> cursors = requestedBlocks.get(block);
             if (cursors == null) {
                 if (true || DEBUG)
                     System.out.println(this + " removeBlockInterest " + block + ", no cursors" + cursor);
@@ -1731,9 +1855,9 @@
                         // If this is the last cursor active in this block
                         // unload the block:
                         if (persistencePolicy.isPagingEnabled()) {
-                            QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
+                            QueueElement<V> qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
                             while (qe != null && qe.restoreBlock == block) {
-                                QueueElement next = qe.getNext();
+                                QueueElement<V> next = qe.getNext();
                                 qe.unload(cursor.memoryController);
                                 qe = next;
                             }
@@ -1765,7 +1889,7 @@
                 for (RestoredElement<V> restored : restoredElems) {
                     try {
 
-                        QueueElement qe = queue.lower(restored.getSequenceNumber(), true);
+                        QueueElement<V> qe = queue.lower(restored.getSequenceNumber(), true);
 
                         // If we don't have a paged out place holder for this
                         // element
@@ -1776,7 +1900,13 @@
                         }
 
                         qe = qe.loadAfter(restored);
-                        elementLoaded(qe);
+
+                        // If paging isn't enabled then add a hard ref to the
+                        // element,
+                        // this will keep it around until it deleted
+                        if (!persistencePolicy.isPagingEnabled()) {
+                            qe.addHardRef();
+                        }
 
                         // If we don't page out place holders we needn't track
                         // block
@@ -1795,11 +1925,25 @@
                 }
 
                 // Add restoring consumers back to trailing consumers:
-                for (Cursor paging : pagingCursors)
+                for (Cursor<V> paging : pagingCursors)
                     paging.onElementsLoaded();
 
                 pagingCursors.clear();
             }
+
+            // Advance the recovery cursor:
+            if (recoveryCursor != null) {
+                while (recoveryCursor.isReady()) {
+                    QueueElement<V> qe = recoveryCursor.getNext();
+                    if (recoveryCursor.atEnd()) {
+                        recoveryCursor.deactivate();
+                        recoveryCursor = null;
+                        break;
+                    }
+                    recoveryCursor.skip(qe);
+
+                }
+            }
         }
 
         public final boolean hasRestoredMessages() {
@@ -1851,7 +1995,7 @@
 
     @Override
     protected ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
-        return sizeController;
+        return inputController;
     }
 
     public V poll() {
@@ -1859,6 +2003,6 @@
     }
 
     public IFlowController<V> getFlowControler() {
-        return sizeController;
+        return inputController;
     }
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu May 14 13:41:14 2009
@@ -40,7 +40,7 @@
 
 public abstract class BrokerTestBase extends TestCase {
 
-    protected static final int PERFORMANCE_SAMPLES = 3;
+    protected static final int PERFORMANCE_SAMPLES = 5;
 
     protected static final int IO_WORK_AMOUNT = 0;
     protected static final int FANIN_COUNT = 10;
@@ -118,6 +118,9 @@
     }
 
     public void test_1_1_0() throws Exception {
+        if (ptp) {
+            return;
+        }
         producerCount = 1;
         destCount = 1;
 
@@ -376,7 +379,7 @@
             sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
             brokers.add(sendBroker);
         }
-        
+
         startBrokers();
 
         Destination[] dests = new Destination[destCount];
@@ -476,7 +479,7 @@
             store = StoreFactory.createStore("memory");
         }
 
-        store.setStoreDirectory(new File("sub/test-data/broker-test/" +  broker.getName()));
+        store.setStoreDirectory(new File("sub/test-data/broker-test/" + broker.getName()));
         store.setDeleteAllMessages(PURGE_STORE);
         return store;
     }
@@ -497,15 +500,14 @@
         }
     }
 
-    private void startBrokers() throws Exception
-    {
+    private void startBrokers() throws Exception {
         for (MessageBroker broker : brokers) {
             broker.start();
         }
     }
-    
+
     private void startClients() throws Exception {
-        
+
         for (RemoteConsumer connection : consumers) {
             connection.start();
         }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Thu May 14 13:41:14 2009
@@ -31,6 +31,7 @@
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.dispatch.IDispatcher;
@@ -68,7 +69,7 @@
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
     private static final boolean PERSISTENT = true;
-    private static final boolean PURGE_STORE = false;
+    private static final boolean PURGE_STORE = true;
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
     protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -111,7 +112,7 @@
             store = StoreFactory.createStore("memory");
         }
 
-        store.setStoreDirectory(new File("test-data/shared-message-queue-test/"));
+        store.setStoreDirectory(new File("test-data/shared-queue-perf-test/"));
         store.setDeleteAllMessages(PURGE_STORE);
         return store;
     }
@@ -121,6 +122,7 @@
         producers.clear();
         queues.clear();
         stopServices();
+        consumerStartDelay = 0;
     }
 
     public void testSharedQueue_1_1_1() throws Exception {
@@ -141,6 +143,7 @@
         try {
             createQueues(1);
             createProducers(1);
+            consumerStartDelay = 10;
             createConsumers(1);
             doTest();
 
@@ -202,56 +205,48 @@
     }
 
     private void doTest() throws Exception {
-        
-        try
-        {
+
+        try {
             // Start queues:
             for (IQueue<Long, MessageDelivery> queue : queues) {
                 queue.start();
             }
-    
-            Runnable startConsumers = new Runnable()
-            {
-                public void run()
-                {
+
+            Runnable startConsumers = new Runnable() {
+                public void run() {
                     // Start consumers:
                     for (Consumer consumer : consumers) {
                         consumer.start();
                     }
                 }
             };
-            
-            if(consumerStartDelay > 0)
-            {
+
+            if (consumerStartDelay > 0) {
                 dispatcher.schedule(startConsumers, consumerStartDelay, TimeUnit.SECONDS);
-            }
-            else
-            {
+            } else {
                 startConsumers.run();
             }
-            
+
             // Start producers:
             for (Producer producer : producers) {
                 producer.start();
             }
             reportRates();
-        }
-        finally
-        {
+        } finally {
             // Stop producers:
             for (Producer producer : producers) {
                 producer.stop();
             }
-            
+
             // Stop consumers:
             for (Consumer consumer : consumers) {
                 consumer.stop();
             }
-            
+
             // Stop queues:
             for (IQueue<Long, MessageDelivery> queue : queues) {
                 queue.stop();
-            }        
+            }
         }
     }
 
@@ -303,7 +298,7 @@
         protected final IFlowRelay<OpenWireMessageDelivery> outboundQueue;
         protected OpenWireMessageDelivery next;
         private int priority;
-        private final byte[] payload;
+        private final String payload;
         private int sequenceNumber;
         private final ActiveMQDestination destination;
         private final IQueue<Long, MessageDelivery> targetQueue;
@@ -316,14 +311,21 @@
             rate.name("Producer " + name + " Rate");
             totalProducerRate.add(rate);
             dispatchContext = dispatcher.register(this, name);
-            payload = new byte[1024];
+            // create a 1024 byte payload (2 bytes per char):
+            payload = new String(new byte[512]);
             producerId = new ProducerId(name);
             wireFormat = new OpenWireFormat();
             wireFormat.setCacheEnabled(false);
             wireFormat.setSizePrefixDisabled(false);
             wireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
 
-            SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(1000, 500);
+            SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(1000 * 1024, 500 * 1024) {
+                @Override
+                public int getElementSize(OpenWireMessageDelivery elem) {
+                    return elem.getFlowLimiterSize();
+                }
+            };
+
             Flow flow = new Flow(name, true);
             outboundQueue = new SingleFlowRelay<OpenWireMessageDelivery>(flow, name, limiter);
             outboundQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
@@ -384,14 +386,14 @@
         }
 
         private void createNextMessage() throws JMSException {
-            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+            ActiveMQTextMessage message = new ActiveMQTextMessage();
             message.setJMSPriority(priority);
             message.setProducerId(producerId);
             message.setMessageId(new MessageId(name, ++sequenceNumber));
             message.setDestination(destination);
             message.setPersistent(PERSISTENT);
             if (payload != null) {
-                message.writeBytes(payload);
+                message.setText(payload);
             }
             next = new OpenWireMessageDelivery(message);
         }
@@ -414,12 +416,14 @@
         private final ExclusiveQueue<MessageDelivery> queue;
         private final IQueue<Long, MessageDelivery> sourceQueue;
         private final QueueStore.QueueDescriptor queueDescriptor;
+        private int limit = 20000;
+        private int count = 0;
 
         public Consumer(String name, IQueue<Long, MessageDelivery> sourceQueue) {
             this.sourceQueue = sourceQueue;
             this.name = name;
             Flow flow = new Flow(name + "-outbound", false);
-            limiter = new SizeLimiter<MessageDelivery>(1024, 512) {
+            limiter = new SizeLimiter<MessageDelivery>(1024 * 1024, 512 * 1024) {
                 public int getElementSize(MessageDelivery m) {
                     return m.getFlowLimiterSize();
                 }
@@ -439,6 +443,10 @@
                 public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
                     elem.acknowledge(queueDescriptor);
                     rate.increment();
+                    /*
+                    if (count++ == limit) {
+                        queue.stop();
+                    }*/
                 }
             });
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Thu May 14 13:41:14 2009
@@ -101,9 +101,17 @@
             try {
                 msg.setStringProperty(property, property);
             } catch (JMSException e) {
-                new RuntimeException(e);
+                throw new RuntimeException(e);
             }
         }
+        
+        //Call the before marshal method to sync the content so we get an 
+        //accurate size:
+        try {
+            msg.beforeMarshall(null);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
         next = new OpenWireMessageDelivery(msg);
     }
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu May 14 13:41:14 2009
@@ -13,6 +13,7 @@
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SharedPriorityQueue;
 import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.SharedQueueOld;
 import org.apache.activemq.queue.Subscription;
 import org.apache.activemq.util.Mapper;
 
@@ -27,6 +28,7 @@
     private Mapper<Long, Message> keyExtractor;
     private final MockStoreAdapater store = new MockStoreAdapater();
     private static final PersistencePolicy<Message> NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY<Message>();
+    private static final boolean USE_OLD_QUEUE = false;
     
     private IQueue<Long, Message> createQueue() {
 
@@ -61,15 +63,27 @@
             queue.initialize(0, 0, 0, 0);
             return queue;
         } else {
-            SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
-            SharedQueue<Long, Message> queue = new SharedQueue<Long, Message>(destination.getName().toString(), limiter);
-            queue.setKeyMapper(keyExtractor);
-            queue.setAutoRelease(true);
-            queue.setDispatcher(broker.getDispatcher());
-            queue.setStore(store);
-            queue.setPersistencePolicy(NO_PERSISTENCE);
-            queue.initialize(0, 0, 0, 0);
-            return queue;
+            if (USE_OLD_QUEUE) {
+                SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
+                SharedQueueOld<Long, Message> queue = new SharedQueueOld<Long, Message>(destination.getName().toString(), limiter);
+                queue.setKeyMapper(keyExtractor);
+                queue.setAutoRelease(true);
+                queue.setDispatcher(broker.getDispatcher());
+                queue.setStore(store);
+                queue.setPersistencePolicy(NO_PERSISTENCE);
+                queue.initialize(0, 0, 0, 0);
+                return queue;
+            } else {
+                SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
+                SharedQueue<Long, Message> queue = new SharedQueue<Long, Message>(destination.getName().toString(), limiter);
+                queue.setKeyMapper(keyExtractor);
+                queue.setAutoRelease(true);
+                queue.setDispatcher(broker.getDispatcher());
+                queue.setStore(store);
+                queue.setPersistencePolicy(NO_PERSISTENCE);
+                queue.initialize(0, 0, 0, 0);
+                return queue;
+            }
         }
     }
 
@@ -83,7 +97,7 @@
 
     public final void addConsumer(final DeliveryTarget dt) {
         Subscription<Message> sub = new Subscription<Message>() {
-            
+
             public boolean isBrowser() {
                 return false;
             }