You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 17:22:05 UTC

svn commit: r655323 [4/4] - in /incubator/qpid/branches/broker-queue-refactor/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main...

Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Sun May 11 08:22:03 2008
@@ -59,24 +59,6 @@
  *     <td> {@link Job}, {@link Job.JobCompletionHandler}
  * </table>
  *
- * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
- *       The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
- *       pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
- *       so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
- *       Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
- *       stages.
- *
- * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
- *       it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
- *       and trips off another batch of 10 until they are all done. Why not just have a straight forward
- *       consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10
- *       in them, just have one queue of events and worker threads taking the next event. There will be coordination
- *       between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same
- *       amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker
- *       pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be
- *       better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily
- *       be substituted in.
- *
  * @todo The static helper methods are pointless. Could just call new.
  */
 public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
@@ -95,17 +77,20 @@
 
     private final int _maxEvents;
 
+    private final boolean _readFilter;
+
     /**
      * Creates a named pooling filter, on the specified shared thread pool.
      *
      * @param refCountingPool The thread pool reference.
      * @param name            The identifying name of the filter type.
      */
-    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
+    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
     {
         _poolReference = refCountingPool;
         _name = name;
         _maxEvents = maxEvents;
+        _readFilter = readFilter;
     }
 
     /**
@@ -166,7 +151,6 @@
     void fireAsynchEvent(Job job, Event event)
     {
 
-        // job.acquire(); //prevents this job being removed from _jobs
         job.add(event);
 
         final ExecutorService pool = _poolReference.getPool();
@@ -200,7 +184,7 @@
      */
     public void createNewJobForSession(IoSession session)
     {
-        Job job = new Job(session, this, MAX_JOB_EVENTS);
+        Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
         session.setAttribute(_name, job);
     }
 
@@ -216,18 +200,6 @@
         return (Job) session.getAttribute(_name);
     }
 
-    /*private Job createJobForSession(IoSession session)
-    {
-        return addJobForSession(session, new Job(session, this, _maxEvents));
-    }*/
-
-    /*private Job addJobForSession(IoSession session, Job job)
-    {
-        // atomic so ensures all threads agree on the same job
-        Job existing = _jobs.putIfAbsent(session, job);
-
-        return (existing == null) ? job : existing;
-    }*/
 
     /**
      * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
@@ -238,16 +210,6 @@
      */
     public void completed(IoSession session, Job job)
     {
-        // if (job.isComplete())
-        // {
-        // job.release();
-        // if (!job.isReferenced())
-        // {
-        // _jobs.remove(session);
-        // }
-        // }
-        // else
-
 
         if (!job.isComplete())
         {
@@ -454,7 +416,7 @@
          */
         public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
         }
 
         /**
@@ -497,7 +459,7 @@
          */
         public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
         }
 
         /**

Added: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java Sun May 11 08:22:03 2008
@@ -0,0 +1,432 @@
+package org.apache.qpid.pool;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
+{
+
+    private final AtomicInteger _count = new AtomicInteger(0);
+
+    private final ReentrantLock _takeLock = new ReentrantLock();
+
+    private final Condition _notEmpty = _takeLock.newCondition();
+
+    private final ReentrantLock _putLock = new ReentrantLock();
+
+    private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>();
+
+    private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>();
+
+
+    private class ReadWriteJobIterator implements Iterator<Runnable>
+    {
+
+        private boolean _onReads;
+        private Iterator<Job> _iter = _writeJobQueue.iterator();
+
+        public boolean hasNext()
+        {
+            if(!_iter.hasNext())
+            {
+                if(_onReads)
+                {
+                    _iter = _readJobQueue.iterator();
+                    _onReads = true;
+                    return _iter.hasNext();
+                }
+                else
+                {
+                    return false;
+                }
+            }
+            else
+            {
+                return true;
+            }
+        }
+
+        public Runnable next()
+        {
+            if(_iter.hasNext())
+            {
+                return _iter.next();
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public void remove()
+        {
+            _takeLock.lock();
+            try
+            {
+                _iter.remove();
+                _count.decrementAndGet();
+            }
+            finally
+            {
+                _takeLock.unlock();
+            }
+        }
+    }
+
+    public Iterator<Runnable> iterator()
+    {
+        return new ReadWriteJobIterator();
+    }
+
+    public int size()
+    {
+        return _count.get();
+    }
+
+    public boolean offer(final Runnable runnable)
+    {
+        final Job job = (Job) runnable;
+        final ReentrantLock putLock = _putLock;
+        putLock.lock();
+        try
+        {
+            if(job.isReadJob())
+            {
+                _readJobQueue.offer(job);
+            }
+            else
+            {
+                _writeJobQueue.offer(job);
+            }
+            if(_count.getAndIncrement() == 0)
+            {
+                _takeLock.lock();
+                try
+                {
+                    _notEmpty.signal();
+                }
+                finally
+                {
+                    _takeLock.unlock();
+                }
+            }
+            return true;
+        }
+        finally
+        {
+            putLock.unlock();
+        }
+    }
+
+    public void put(final Runnable runnable) throws InterruptedException
+    {
+        final Job job = (Job) runnable;
+        final ReentrantLock putLock = _putLock;
+        putLock.lock();
+
+        try
+        {
+            if(job.isReadJob())
+            {
+                _readJobQueue.offer(job);
+            }
+            else
+            {
+                _writeJobQueue.offer(job);
+            }
+            if(_count.getAndIncrement() == 0)
+            {
+                                _takeLock.lock();
+                try
+                {
+                    _notEmpty.signal();
+                }
+                finally
+                {
+                    _takeLock.unlock();
+                }
+            }
+
+        }
+        finally
+        {
+            putLock.unlock();
+        }
+    }
+
+
+
+    public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
+    {
+        final Job job = (Job) runnable;
+        final ReentrantLock putLock = _putLock;
+        putLock.lock();
+
+        try
+        {
+            if(job.isReadJob())
+            {
+                _readJobQueue.offer(job);
+            }
+            else
+            {
+                _writeJobQueue.offer(job);
+            }
+            if(_count.getAndIncrement() == 0)
+            {
+                _takeLock.lock();
+                try
+                {
+                    _notEmpty.signal();
+                }
+                finally
+                {
+                    _takeLock.unlock();
+                }
+            }
+
+            return true;
+        }
+        finally
+        {
+            putLock.unlock();
+        }
+
+    }
+
+    public Runnable take() throws InterruptedException
+    {
+        final ReentrantLock takeLock = _takeLock;
+        takeLock.lockInterruptibly();
+        try
+        {
+            try
+            {
+                while (_count.get() == 0)
+                {
+                    _notEmpty.await();
+                }
+            }
+            catch (InterruptedException ie)
+            {
+                _notEmpty.signal();
+                throw ie;
+            }
+
+            Job job = _writeJobQueue.poll();
+            if(job == null)
+            {
+                job = _readJobQueue.poll();
+            }
+            int c = _count.getAndDecrement();
+            if (c > 1)
+            {
+                _notEmpty.signal();
+            }
+            return job;
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+
+
+    }
+
+    public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
+    {
+        final ReentrantLock takeLock = _takeLock;
+        final AtomicInteger count = _count;
+        long nanos = unit.toNanos(timeout);
+        takeLock.lockInterruptibly();
+        Job job = null;
+        try
+        {
+
+            for (;;)
+            {
+                if (count.get() > 0)
+                {
+                    job = _writeJobQueue.poll();
+                    if(job == null)
+                    {
+                        job = _readJobQueue.poll();
+                    }
+                    int c = count.getAndDecrement();
+                    if (c > 1)
+                    {
+                        _notEmpty.signal();
+                    }
+                    break;
+                }
+                if (nanos <= 0)
+                {
+                    return null;
+                }
+                try
+                {
+                    nanos = _notEmpty.awaitNanos(nanos);
+                }
+                catch (InterruptedException ie)
+                {
+                    _notEmpty.signal();
+                    throw ie;
+                }
+            }
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+
+        return job;
+    }
+
+    public int remainingCapacity()
+    {
+        return Integer.MAX_VALUE;
+    }
+
+    public int drainTo(final Collection<? super Runnable> c)
+    {
+        int total = 0;
+
+        _putLock.lock();
+        _takeLock.lock();
+        try
+        {
+            Job job;
+            while((job = _writeJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _writeJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+            while((job = _readJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _readJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+        }
+        finally
+        {
+            _takeLock.unlock();
+            _putLock.unlock();
+        }
+        return total;
+    }
+
+    public int drainTo(final Collection<? super Runnable> c, final int maxElements)
+    {
+        int total = 0;
+
+        _putLock.lock();
+        _takeLock.lock();
+        try
+        {
+            Job job;
+            while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _writeJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+            while(total<=maxElements && (job = _readJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _readJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+        }
+        finally
+        {
+            _takeLock.unlock();
+            _putLock.unlock();
+        }
+        return total;
+
+    }
+
+    public Runnable poll()
+    {
+        final ReentrantLock takeLock = _takeLock;
+        takeLock.lock();
+        try
+        {
+            if(_count.get() > 0)
+            {
+                Job job = _writeJobQueue.poll();
+                if(job == null)
+                {
+                    job = _readJobQueue.poll();
+                }
+                _count.decrementAndGet();
+                return job;
+            }
+            else
+            {
+                return null;
+            }
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+
+    }
+
+    public Runnable peek()
+    {
+        final ReentrantLock takeLock = _takeLock;
+        takeLock.lock();
+        try
+        {
+            Job job = _writeJobQueue.peek();
+            if(job == null)
+            {
+                job = _readJobQueue.peek();
+            }
+            return job;
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+    }
+}

Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Sun May 11 08:22:03 2008
@@ -22,6 +22,9 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
@@ -111,7 +114,12 @@
         {
             if (_refCount++ == 0)
             {
-                _pool = Executors.newFixedThreadPool(_poolSize);
+//                _pool = Executors.newFixedThreadPool(_poolSize);
+
+                // Use a job queue that biases to writes
+                _pool =  new ThreadPoolExecutor(_poolSize, _poolSize,
+                                      0L, TimeUnit.MILLISECONDS,
+                                      new ReadWriteJobQueue());
             }
 
             return _pool;

Modified: incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java Sun May 11 08:22:03 2008
@@ -63,7 +63,8 @@
     {
         // If this test fails due to changes in the broker code,
         // then the constants in the Constants.java shoule be updated accordingly
-        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost,
+                                                            null);
         AMQManagedObject mbean = new AMQQueueMBean(queue);
         MBeanInfo mbeanInfo = mbean.getMBeanInfo();
 

Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Sun May 11 08:22:03 2008
@@ -34,7 +34,6 @@
 import org.apache.qpid.server.exchange.AbstractExchange;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQQueueImpl;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
@@ -207,12 +206,7 @@
         
     }
 
-	@Override
-	public Map<AMQShortString, List<AMQQueue>> getBindings() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
+	
 	public boolean isBound(AMQShortString routingKey, FieldTable arguments,
 			AMQQueue queue) {
 		// TODO Auto-generated method stub

Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Sun May 11 08:22:03 2008
@@ -7,7 +7,6 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueueImpl;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml Sun May 11 08:22:03 2008
@@ -151,7 +151,6 @@
                         <MessageReturnTest>-n MessageReturnTest org.apache.qpid.server.queue.MessageReturnTest </MessageReturnTest>
                         <QueueDepthWithSelectorTest>-n QueueDepthWithSelectorTest org.apache.qpid.server.queue.QueueDepthWithSelectorTest </QueueDepthWithSelectorTest>
                         <!--<SubscriptionManagerTest>-n SubscriptionManagerTest org.apache.qpid.server.queue.SubscriptionManagerTest </SubscriptionManagerTest>-->
-                        <SubscriptionSetTest>-n SubscriptionSetTest org.apache.qpid.server.queue.SubscriptionSetTest </SubscriptionSetTest>
                         <TimeToLiveTest>-n TimeToLiveTest org.apache.qpid.server.queue.TimeToLiveTest </TimeToLiveTest>
                         <TxnBufferTest>-n TxnBufferTest org.apache.qpid.server.txn.TxnBufferTest </TxnBufferTest>
                         <!--<TxnTest>-n TxnTest org.apache.qpid.server.txn.TxnTest </TxnTest>-->

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Sun May 11 08:22:03 2008
@@ -26,13 +26,16 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 
@@ -100,12 +103,16 @@
         private final List<Long> _unacked;
         private StoreContext _storeContext = new StoreContext();
 
-        Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws AMQException
+        Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
         {
             TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
                                                                           _storeContext, null,
                                                                           new LinkedList<RequiredDeliveryException>()
             );
+            AMQQueue queue =
+                    AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("", new MemoryMessageStore()),
+                                                       null);
+
             for (int i = 0; i < messageCount; i++)
             {
                 long deliveryTag = i + 1;
@@ -140,7 +147,7 @@
                 };
 
                 TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
-                _map.add(deliveryTag, new QueueEntryImpl(null,message, Long.MIN_VALUE));
+                _map.add(deliveryTag, queue.enqueue(new StoreContext(), message));
             }
             _acked = acked;
             _unacked = unacked;

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun May 11 08:22:03 2008
@@ -33,6 +33,7 @@
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.log4j.Logger;
 
@@ -236,7 +237,7 @@
         return properties;
     }
 
-    static class TestQueue extends AMQQueueImpl
+    static class TestQueue extends SimpleAMQQueue
     {
         final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
 
@@ -250,12 +251,160 @@
          * not invoked. It is unnecessary since for this test we only care to know whether the message was
          * sent to the queue; the queue processing logic is not being tested.
          * @param msg
-         * @param deliverFirst
          * @throws AMQException
          */
-        public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
+        @Override
+        public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
         {
-            messages.add( new HeadersExchangeTest.Message(msg.getMessage()));
+            messages.add( new HeadersExchangeTest.Message(msg));
+            return new QueueEntry()
+            {
+
+                public AMQQueue getQueue()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public AMQMessage getMessage()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public long getSize()
+                {
+                    return 0;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean getDeliveredToConsumer()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean expired() throws AMQException
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isAcquired()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean acquire()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean acquire(Subscription sub)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean delete()
+                {
+                    return false;
+                }
+
+                public boolean isDeleted()
+                {
+                    return false;
+                }
+
+                public boolean acquiredBySubscription()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void setDeliveredToSubscription()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void release()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public String debugIdentity()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean immediateAndNotDelivered()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void setRedelivered(boolean b)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public Subscription getDeliveredSubscription()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void reject()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void reject(Subscription subscription)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isRejectedBy(Subscription subscription)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void requeue(StoreContext storeContext) throws AMQException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void dispose(final StoreContext storeContext) throws MessageCleanupException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void restoreCredit()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void discard(StoreContext storeContext) throws AMQException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isQueueDeleted()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void addStateChangeListener(StateChangeListener listener)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean removeStateChangeListener(StateChangeListener listener)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public int compareTo(final QueueEntry o)
+                {
+                    return 0;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+            };
         }
 
         boolean isInQueue(Message msg)

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Sun May 11 08:22:03 2008
@@ -59,7 +59,7 @@
                                                                    false,
                                                                    new AMQShortString("test"),
                                                                    true,
-                                                                   _protocolSession.getVirtualHost());
+                                                                   _protocolSession.getVirtualHost(), null);
         AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
         channel.setDefaultQueue(queue);
         _protocolSession.addChannel(channel);

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Sun May 11 08:22:03 2008
@@ -78,7 +78,8 @@
 
         _protocolSession.addChannel(_channel);
 
-        _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
+        _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
+                                                    null);
 
     }
 

Added: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java Sun May 11 08:22:03 2008
@@ -0,0 +1,171 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class PriorityTest extends TestCase
+{
+    private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+
+
+    protected final String BROKER = "vm://:1";
+    protected final String VHOST = "/test";
+    protected final String QUEUE = "PriorityQueue";
+
+
+    private static final int MSG_COUNT = 50;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        if (usingInVMBroker())
+        {
+            TransportConnection.createVMBroker(1);
+        }
+
+
+    }
+
+    private boolean usingInVMBroker()
+    {
+        return BROKER.startsWith("vm://");
+    }
+
+    protected void tearDown() throws Exception
+    {
+        if (usingInVMBroker())
+        {
+            TransportConnection.killAllVMBrokers();
+        }
+        super.tearDown();
+    }
+
+    public void testPriority() throws JMSException, NamingException, AMQException
+    {
+        InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+
+        env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
+        env.put("queue.queue", QUEUE);
+
+        Context context = factory.getInitialContext(env);
+
+        Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        final FieldTable arguments = new FieldTable();
+        arguments.put(new AMQShortString("x-qpid-priorities"),10);
+
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+
+        Queue queue = (Queue) context.lookup("queue");
+
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+
+
+
+
+
+
+        producerConnection.start();
+
+
+        MessageProducer producer = producerSession.createProducer(queue);
+
+
+
+
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            producer.setPriority(msg % 10);
+            producer.send(nextMessage(msg, false, producerSession, producer));
+        }
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+        Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+
+
+
+        consumerConnection.start();
+
+        Message received;
+        //Receive Message 0
+        StringBuilder buf = new StringBuilder();
+        int receivedCount = 0;
+        Message previous = null;
+        while((received = consumer.receive(1000))!=null)
+        {
+            if(previous != null)
+            {
+                assertTrue("Messages arrived in unexpected order", (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
+            }
+
+            previous = received;
+            receivedCount++;
+        }
+
+        assertEquals("Incorrect number of message received", 50, receivedCount);
+
+        producerSession.close();
+        producer.close();
+
+    }
+
+    private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+    {
+        Message send = producerSession.createTextMessage("Message: " + msg);
+        send.setIntProperty("msg", msg);
+
+        return send;
+    }
+
+
+}

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun May 11 08:22:03 2008
@@ -106,12 +106,12 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public Object getQueueContext()
+    public QueueEntry getLastSeenEntry()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public boolean setQueueContext(Object expected, Object newValue)
+    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
     {
         return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -126,7 +126,7 @@
         //no-op
     }
 
-    public AMQShortString getConumerTag()
+    public AMQShortString getConsumerTag()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -141,6 +141,11 @@
         return null;
     }
 
+    public QueueEntry.SubscriptionAcquiredState getOwningState()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void queueDeleted(AMQQueue queue)
     {
     }

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Sun May 11 08:22:03 2008
@@ -25,7 +25,6 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.AMQQueueImpl;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;

Added: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java Sun May 11 08:22:03 2008
@@ -0,0 +1,209 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import java.util.Enumeration;
+
+public class FlowControlTest extends VMTestCase
+{
+    private static final Logger _logger = Logger.getLogger(FlowControlTest.class);
+
+    private Connection _clientConnection;
+    private Session _clientSession;
+    private Queue _queue;
+
+    public void setUp() throws Exception
+    {
+
+        super.setUp();
+
+
+    }
+
+    /**
+     * Simply
+     */
+    public void testBasicBytesFlowControl() throws JMSException, NamingException, AMQException
+    {
+         _queue = new AMQQueue("amq.direct","testqueue");//(Queue) _context.lookup("queue");
+
+        //Create Client
+        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //Ensure _queue is created
+        _clientSession.createConsumer(_queue).close();
+
+        Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        BytesMessage m1 = producerSession.createBytesMessage();
+        m1.writeBytes(new byte[128]);
+        m1.setIntProperty("msg",1);
+        producer.send(m1);
+        BytesMessage m2 = producerSession.createBytesMessage();
+        m2.writeBytes(new byte[128]);
+        m2.setIntProperty("msg",2);
+        producer.send(m2);
+        BytesMessage m3 = producerSession.createBytesMessage();
+        m3.writeBytes(new byte[256]);
+        m3.setIntProperty("msg",3);
+        producer.send(m3);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+        Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+        Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession)consumerSession).setPrefecthLimits(0,256);
+        MessageConsumer recv = consumerSession.createConsumer(_queue);
+        consumerConnection.start();
+
+        Message r1 = recv.receive(RECEIVE_TIMEOUT);
+        assertNotNull("First message not received", r1);
+        assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+        Message r2 = recv.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Second message not received", r2);
+        assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+        Message r3 = recv.receiveNoWait();
+        assertNull("Third message incorrectly delivered", r3);
+
+        r1.acknowledge();
+
+        r3 = recv.receiveNoWait();
+        assertNull("Third message incorrectly delivered", r3);
+
+        r2.acknowledge();
+
+
+        r3 = recv.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Third message not received", r3);
+        assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+        r3.acknowledge();
+        recv.close();
+        consumerSession.close();
+        consumerConnection.close();
+
+    }
+
+    public void testTwoConsumersBytesFlowControl() throws JMSException, NamingException, AMQException
+    {
+         _queue = new AMQQueue("amq.direct","testqueue1");//(Queue) _context.lookup("queue");
+
+        //Create Client
+        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //Ensure _queue is created
+        _clientSession.createConsumer(_queue).close();
+
+        Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        BytesMessage m1 = producerSession.createBytesMessage();
+        m1.writeBytes(new byte[128]);
+        m1.setIntProperty("msg",1);
+        producer.send(m1);
+        BytesMessage m2 = producerSession.createBytesMessage();
+        m2.writeBytes(new byte[256]);
+        m2.setIntProperty("msg",2);
+        producer.send(m2);
+        BytesMessage m3 = producerSession.createBytesMessage();
+        m3.writeBytes(new byte[128]);
+        m3.setIntProperty("msg",3);
+        producer.send(m3);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+        Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+        Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession)consumerSession1).setPrefecthLimits(0,256);
+        MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
+
+        consumerConnection.start();
+
+        Message r1 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNotNull("First message not received", r1);
+        assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+
+        Message r2 = recv1.receiveNoWait();
+        assertNull("Second message incorrectly delivered", r2);
+        
+        Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession)consumerSession2).setPrefecthLimits(0,256);
+        MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
+
+
+        r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT);
+        assertNotNull("Second message not received", r2);
+        assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+        Message r3 = recv2.receiveNoWait();
+        assertNull("Third message incorrectly delivered", r3);
+
+        r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT);
+        assertNotNull("Third message not received", r3);
+        assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+
+
+        r2.acknowledge();
+        r3.acknowledge();
+        recv1.close();
+        recv2.close();
+        consumerSession1.close();
+        consumerSession2.close();
+        consumerConnection.close();
+
+    }
+
+}