You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 17:22:05 UTC
svn commit: r655323 [4/4] - in
/incubator/qpid/branches/broker-queue-refactor/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main...
Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Sun May 11 08:22:03 2008
@@ -59,24 +59,6 @@
* <td> {@link Job}, {@link Job.JobCompletionHandler}
* </table>
*
- * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
- * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
- * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
- * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
- * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
- * stages.
- *
- * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
- * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
- * and trips off another batch of 10 until they are all done. Why not just have a straight forward
- * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10
- * in them, just have one queue of events and worker threads taking the next event. There will be coordination
- * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same
- * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker
- * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be
- * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily
- * be substituted in.
- *
* @todo The static helper methods are pointless. Could just call new.
*/
public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
@@ -95,17 +77,20 @@
private final int _maxEvents;
+ private final boolean _readFilter;
+
/**
* Creates a named pooling filter, on the specified shared thread pool.
*
* @param refCountingPool The thread pool reference.
* @param name The identifying name of the filter type.
*/
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
{
_poolReference = refCountingPool;
_name = name;
_maxEvents = maxEvents;
+ _readFilter = readFilter;
}
/**
@@ -166,7 +151,6 @@
void fireAsynchEvent(Job job, Event event)
{
- // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
final ExecutorService pool = _poolReference.getPool();
@@ -200,7 +184,7 @@
*/
public void createNewJobForSession(IoSession session)
{
- Job job = new Job(session, this, MAX_JOB_EVENTS);
+ Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
session.setAttribute(_name, job);
}
@@ -216,18 +200,6 @@
return (Job) session.getAttribute(_name);
}
- /*private Job createJobForSession(IoSession session)
- {
- return addJobForSession(session, new Job(session, this, _maxEvents));
- }*/
-
- /*private Job addJobForSession(IoSession session, Job job)
- {
- // atomic so ensures all threads agree on the same job
- Job existing = _jobs.putIfAbsent(session, job);
-
- return (existing == null) ? job : existing;
- }*/
/**
* Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
@@ -238,16 +210,6 @@
*/
public void completed(IoSession session, Job job)
{
- // if (job.isComplete())
- // {
- // job.release();
- // if (!job.isReferenced())
- // {
- // _jobs.remove(session);
- // }
- // }
- // else
-
if (!job.isComplete())
{
@@ -454,7 +416,7 @@
*/
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
}
/**
@@ -497,7 +459,7 @@
*/
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
}
/**
Added: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java Sun May 11 08:22:03 2008
@@ -0,0 +1,432 @@
+package org.apache.qpid.pool;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
+{
+
+ private final AtomicInteger _count = new AtomicInteger(0);
+
+ private final ReentrantLock _takeLock = new ReentrantLock();
+
+ private final Condition _notEmpty = _takeLock.newCondition();
+
+ private final ReentrantLock _putLock = new ReentrantLock();
+
+ private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>();
+
+ private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>();
+
+
+ private class ReadWriteJobIterator implements Iterator<Runnable>
+ {
+
+ private boolean _onReads;
+ private Iterator<Job> _iter = _writeJobQueue.iterator();
+
+ public boolean hasNext()
+ {
+ if(!_iter.hasNext())
+ {
+ if(_onReads)
+ {
+ _iter = _readJobQueue.iterator();
+ _onReads = true;
+ return _iter.hasNext();
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public Runnable next()
+ {
+ if(_iter.hasNext())
+ {
+ return _iter.next();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ _takeLock.lock();
+ try
+ {
+ _iter.remove();
+ _count.decrementAndGet();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ }
+
+ public Iterator<Runnable> iterator()
+ {
+ return new ReadWriteJobIterator();
+ }
+
+ public int size()
+ {
+ return _count.get();
+ }
+
+ public boolean offer(final Runnable runnable)
+ {
+ final Job job = (Job) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+ try
+ {
+ if(job.isReadJob())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+ public void put(final Runnable runnable) throws InterruptedException
+ {
+ final Job job = (Job) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isReadJob())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+
+
+ public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final Job job = (Job) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isReadJob())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+
+ }
+
+ public Runnable take() throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lockInterruptibly();
+ try
+ {
+ try
+ {
+ while (_count.get() == 0)
+ {
+ _notEmpty.await();
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+
+ Job job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = _count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+
+ }
+
+ public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ final AtomicInteger count = _count;
+ long nanos = unit.toNanos(timeout);
+ takeLock.lockInterruptibly();
+ Job job = null;
+ try
+ {
+
+ for (;;)
+ {
+ if (count.get() > 0)
+ {
+ job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ break;
+ }
+ if (nanos <= 0)
+ {
+ return null;
+ }
+ try
+ {
+ nanos = _notEmpty.awaitNanos(nanos);
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ return job;
+ }
+
+ public int remainingCapacity()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ Job job;
+ while((job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while((job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c, final int maxElements)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ Job job;
+ while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while(total<=maxElements && (job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+
+ }
+
+ public Runnable poll()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ if(_count.get() > 0)
+ {
+ Job job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ _count.decrementAndGet();
+ return job;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ }
+
+ public Runnable peek()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ Job job = _writeJobQueue.peek();
+ if(job == null)
+ {
+ job = _readJobQueue.peek();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+ }
+}
Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Sun May 11 08:22:03 2008
@@ -22,6 +22,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
@@ -111,7 +114,12 @@
{
if (_refCount++ == 0)
{
- _pool = Executors.newFixedThreadPool(_poolSize);
+// _pool = Executors.newFixedThreadPool(_poolSize);
+
+ // Use a job queue that biases to writes
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ReadWriteJobQueue());
}
return _pool;
Modified: incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java Sun May 11 08:22:03 2008
@@ -63,7 +63,8 @@
{
// If this test fails due to changes in the broker code,
// then the constants in the Constants.java shoule be updated accordingly
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost,
+ null);
AMQManagedObject mbean = new AMQQueueMBean(queue);
MBeanInfo mbeanInfo = mbean.getMBeanInfo();
Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Sun May 11 08:22:03 2008
@@ -34,7 +34,6 @@
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQQueueImpl;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
@@ -207,12 +206,7 @@
}
- @Override
- public Map<AMQShortString, List<AMQQueue>> getBindings() {
- // TODO Auto-generated method stub
- return null;
- }
-
+
public boolean isBound(AMQShortString routingKey, FieldTable arguments,
AMQQueue queue) {
// TODO Auto-generated method stub
Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Sun May 11 08:22:03 2008
@@ -7,7 +7,6 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueueImpl;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml Sun May 11 08:22:03 2008
@@ -151,7 +151,6 @@
<MessageReturnTest>-n MessageReturnTest org.apache.qpid.server.queue.MessageReturnTest </MessageReturnTest>
<QueueDepthWithSelectorTest>-n QueueDepthWithSelectorTest org.apache.qpid.server.queue.QueueDepthWithSelectorTest </QueueDepthWithSelectorTest>
<!--<SubscriptionManagerTest>-n SubscriptionManagerTest org.apache.qpid.server.queue.SubscriptionManagerTest </SubscriptionManagerTest>-->
- <SubscriptionSetTest>-n SubscriptionSetTest org.apache.qpid.server.queue.SubscriptionSetTest </SubscriptionSetTest>
<TimeToLiveTest>-n TimeToLiveTest org.apache.qpid.server.queue.TimeToLiveTest </TimeToLiveTest>
<TxnBufferTest>-n TxnBufferTest org.apache.qpid.server.txn.TxnBufferTest </TxnBufferTest>
<!--<TxnTest>-n TxnTest org.apache.qpid.server.txn.TxnTest </TxnTest>-->
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Sun May 11 08:22:03 2008
@@ -26,13 +26,16 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -100,12 +103,16 @@
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
- Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws AMQException
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
_storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
+ AMQQueue queue =
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("", new MemoryMessageStore()),
+ null);
+
for (int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
@@ -140,7 +147,7 @@
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
- _map.add(deliveryTag, new QueueEntryImpl(null,message, Long.MIN_VALUE));
+ _map.add(deliveryTag, queue.enqueue(new StoreContext(), message));
}
_acked = acked;
_unacked = unacked;
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun May 11 08:22:03 2008
@@ -33,6 +33,7 @@
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
@@ -236,7 +237,7 @@
return properties;
}
- static class TestQueue extends AMQQueueImpl
+ static class TestQueue extends SimpleAMQQueue
{
final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
@@ -250,12 +251,160 @@
* not invoked. It is unnecessary since for this test we only care to know whether the message was
* sent to the queue; the queue processing logic is not being tested.
* @param msg
- * @param deliverFirst
* @throws AMQException
*/
- public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
+ @Override
+ public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
{
- messages.add( new HeadersExchangeTest.Message(msg.getMessage()));
+ messages.add( new HeadersExchangeTest.Message(msg));
+ return new QueueEntry()
+ {
+
+ public AMQQueue getQueue()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public AMQMessage getMessage()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getSize()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean getDeliveredToConsumer()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean expired() throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isAcquired()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean acquire()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean acquire(Subscription sub)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean delete()
+ {
+ return false;
+ }
+
+ public boolean isDeleted()
+ {
+ return false;
+ }
+
+ public boolean acquiredBySubscription()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setDeliveredToSubscription()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void release()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String debugIdentity()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean immediateAndNotDelivered()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setRedelivered(boolean b)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void reject()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void reject(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void requeue(StoreContext storeContext) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void restoreCredit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void discard(StoreContext storeContext) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isQueueDeleted()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addStateChangeListener(StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean removeStateChangeListener(StateChangeListener listener)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int compareTo(final QueueEntry o)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
}
boolean isInQueue(Message msg)
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Sun May 11 08:22:03 2008
@@ -59,7 +59,7 @@
false,
new AMQShortString("test"),
true,
- _protocolSession.getVirtualHost());
+ _protocolSession.getVirtualHost(), null);
AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Sun May 11 08:22:03 2008
@@ -78,7 +78,8 @@
_protocolSession.addChannel(_channel);
- _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
+ null);
}
Added: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java Sun May 11 08:22:03 2008
@@ -0,0 +1,171 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class PriorityTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+
+
+ protected final String BROKER = "vm://:1";
+ protected final String VHOST = "/test";
+ protected final String QUEUE = "PriorityQueue";
+
+
+ private static final int MSG_COUNT = 50;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ if (usingInVMBroker())
+ {
+ TransportConnection.createVMBroker(1);
+ }
+
+
+ }
+
+ private boolean usingInVMBroker()
+ {
+ return BROKER.startsWith("vm://");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (usingInVMBroker())
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+ super.tearDown();
+ }
+
+ public void testPriority() throws JMSException, NamingException, AMQException
+ {
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
+ env.put("queue.queue", QUEUE);
+
+ Context context = factory.getInitialContext(env);
+
+ Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final FieldTable arguments = new FieldTable();
+ arguments.put(new AMQShortString("x-qpid-priorities"),10);
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+
+ Queue queue = (Queue) context.lookup("queue");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+
+
+
+
+
+
+ producerConnection.start();
+
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+
+
+
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.setPriority(msg % 10);
+ producer.send(nextMessage(msg, false, producerSession, producer));
+ }
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+
+ Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+
+
+
+ consumerConnection.start();
+
+ Message received;
+ //Receive Message 0
+ StringBuilder buf = new StringBuilder();
+ int receivedCount = 0;
+ Message previous = null;
+ while((received = consumer.receive(1000))!=null)
+ {
+ if(previous != null)
+ {
+ assertTrue("Messages arrived in unexpected order", (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
+ }
+
+ previous = received;
+ receivedCount++;
+ }
+
+ assertEquals("Incorrect number of message received", 50, receivedCount);
+
+ producerSession.close();
+ producer.close();
+
+ }
+
+ private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+
+
+}
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun May 11 08:22:03 2008
@@ -106,12 +106,12 @@
//To change body of implemented methods use File | Settings | File Templates.
}
- public Object getQueueContext()
+ public QueueEntry getLastSeenEntry()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public boolean setQueueContext(Object expected, Object newValue)
+ public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -126,7 +126,7 @@
//no-op
}
- public AMQShortString getConumerTag()
+ public AMQShortString getConsumerTag()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -141,6 +141,11 @@
return null;
}
+ public QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void queueDeleted(AMQQueue queue)
{
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Sun May 11 08:22:03 2008
@@ -25,7 +25,6 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.AMQQueueImpl;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
Added: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java Sun May 11 08:22:03 2008
@@ -0,0 +1,209 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import java.util.Enumeration;
+
+public class FlowControlTest extends VMTestCase
+{
+ private static final Logger _logger = Logger.getLogger(FlowControlTest.class);
+
+ private Connection _clientConnection;
+ private Session _clientSession;
+ private Queue _queue;
+
+ public void setUp() throws Exception
+ {
+
+ super.setUp();
+
+
+ }
+
+ /**
+ * Simply
+ */
+ public void testBasicBytesFlowControl() throws JMSException, NamingException, AMQException
+ {
+ _queue = new AMQQueue("amq.direct","testqueue");//(Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ BytesMessage m1 = producerSession.createBytesMessage();
+ m1.writeBytes(new byte[128]);
+ m1.setIntProperty("msg",1);
+ producer.send(m1);
+ BytesMessage m2 = producerSession.createBytesMessage();
+ m2.writeBytes(new byte[128]);
+ m2.setIntProperty("msg",2);
+ producer.send(m2);
+ BytesMessage m3 = producerSession.createBytesMessage();
+ m3.writeBytes(new byte[256]);
+ m3.setIntProperty("msg",3);
+ producer.send(m3);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+
+ Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ ((AMQSession)consumerSession).setPrefecthLimits(0,256);
+ MessageConsumer recv = consumerSession.createConsumer(_queue);
+ consumerConnection.start();
+
+ Message r1 = recv.receive(RECEIVE_TIMEOUT);
+ assertNotNull("First message not received", r1);
+ assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+ Message r2 = recv.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Second message not received", r2);
+ assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+ Message r3 = recv.receiveNoWait();
+ assertNull("Third message incorrectly delivered", r3);
+
+ r1.acknowledge();
+
+ r3 = recv.receiveNoWait();
+ assertNull("Third message incorrectly delivered", r3);
+
+ r2.acknowledge();
+
+
+ r3 = recv.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Third message not received", r3);
+ assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+ r3.acknowledge();
+ recv.close();
+ consumerSession.close();
+ consumerConnection.close();
+
+ }
+
+ public void testTwoConsumersBytesFlowControl() throws JMSException, NamingException, AMQException
+ {
+ _queue = new AMQQueue("amq.direct","testqueue1");//(Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ BytesMessage m1 = producerSession.createBytesMessage();
+ m1.writeBytes(new byte[128]);
+ m1.setIntProperty("msg",1);
+ producer.send(m1);
+ BytesMessage m2 = producerSession.createBytesMessage();
+ m2.writeBytes(new byte[256]);
+ m2.setIntProperty("msg",2);
+ producer.send(m2);
+ BytesMessage m3 = producerSession.createBytesMessage();
+ m3.writeBytes(new byte[128]);
+ m3.setIntProperty("msg",3);
+ producer.send(m3);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+
+ Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ ((AMQSession)consumerSession1).setPrefecthLimits(0,256);
+ MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
+
+ consumerConnection.start();
+
+ Message r1 = recv1.receive(RECEIVE_TIMEOUT);
+ assertNotNull("First message not received", r1);
+ assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+
+ Message r2 = recv1.receiveNoWait();
+ assertNull("Second message incorrectly delivered", r2);
+
+ Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ ((AMQSession)consumerSession2).setPrefecthLimits(0,256);
+ MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
+
+
+ r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT);
+ assertNotNull("Second message not received", r2);
+ assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+ Message r3 = recv2.receiveNoWait();
+ assertNull("Third message incorrectly delivered", r3);
+
+ r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT);
+ assertNotNull("Third message not received", r3);
+ assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+
+
+ r2.acknowledge();
+ r3.acknowledge();
+ recv1.close();
+ recv2.close();
+ consumerSession1.close();
+ consumerSession2.close();
+ consumerConnection.close();
+
+ }
+
+}