You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/05/22 13:35:11 UTC

svn commit: r540533 - /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/

Author: rupertlssmith
Date: Tue May 22 04:35:10 2007
New Revision: 540533

URL: http://svn.apache.org/viewvc?view=rev&rev=540533
Log:
Added batch synched queue implementation.

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java Tue May 22 04:35:10 2007
@@ -0,0 +1,13 @@
+package org.apache.qpid.util.concurrent;
+
+/**
+ * Used to signal that a data element and its producer cannot be requeued or sent an error message when using a
+ * {@link BatchSynchQueue} because the producer has already been unblocked by an unblocking take on the queue.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Signal that an unblocking take has already occurred.
+ * </table>
+ */
+public class AlreadyUnblockedException extends RuntimeException
+{ }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java Tue May 22 04:35:10 2007
@@ -0,0 +1,101 @@
+package org.apache.qpid.util.concurrent;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this
+ * pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull
+ * work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is
+ * consumed or until a consumer chooses to release the producer some time after consuming the data from the queue.
+ *
+ * <p>There are a number of possible advantages to using this technique when compared with having the producers
+ * processing their own data:
+ *
+ * <ul>
+ * <li>Data may be deposited asynchronously in the buffer allowing the producers to continue running.</li>
+ * <li>Data may be deposited synchronously in the buffer so that producers wait until their data has been processed
+ *     before being allowed to continue.</li>
+ * <li>Variable rates of production/consumption can be smoothed over by the buffer as it provides space in memory to
+ *     hold data between production and consumption.</li>
+ * <li>Consumers may be able to batch data as they consume it leading to more efficient consumption over
+ *     individual data item consumption where latency associated with the consume operation can be ammortized.
+ *     For example, it may be possibly to ammortize the cost of a disk seek over many producers.</li>
+ * <li>Data from seperate threads can be combined together in the buffer, providing a convenient way of spreading work
+ *     amongst many workers and gathering the results together again.</li>
+ * <li>Different types of queue can be used to hold the buffer, resulting in different processing orders. For example,
+ *     lifo, fifo, priority heap, etc.</li>
+ * </ul>
+ *
+ * <p/>The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package
+ * (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the
+ * blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional
+ * take methods that can be used to take data from a queue without releasing producers, so that consumers have an
+ * opportunity to confirm correct processing of the data before producers are released. It also adds a put method with
+ * exceptions so that consumers can signal exception cases back to producers where there are errors in the data.
+ *
+ * <p/>This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data
+ * from many threads but where synchronous handling of that data is neccessary because producers need to know that
+ * their data has been processed before they continue. For example, sending a bundle of messages together, or writing
+ * many records to disk at once, may result in improved performance but the originators of the messages or disk records
+ * need confirmation that their data has really been sent or saved to disk.
+ *
+ * <p/>The consumer can put an element back onto the queue or send an error message to the elements producer using the
+ * {@link SynchRecord} interface.
+ *
+ * <p/>The {@link #take()}, {@link #drainTo(java.util.Collection<? super E>)}  and
+ * {@link #drainTo(java.util.Collection<? super E>, int)} methods from {@link BlockingQueue} should behave as if they
+ * have been called with unblock set to false. That is they take elements from the queue but leave the producers
+ * blocked. These methods do not return collections of {@link SynchRecord}s so they do not supply an interface through
+ * which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process
+ * all the records it takes.
+ *
+ * <p/>The {@link #put} method should silently swallow any exceptions that consumers attempt to return to the caller.
+ * In order to handle exceptions the {@link #tryPut} method must be used.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Handle synchronous puts, with possible exceptions.
+ * <tr><td> Allow consumers to take many records from a queue in a batch.
+ * <tr><td> Allow consumers to decide when to unblock synchronous producers.
+ * </table>
+ */
+public interface BatchSynchQueue<E> extends BlockingQueue<E>
+{
+    /**
+     * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
+     * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
+     *
+     * @param e The data element to put into the queue.
+     *
+     * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
+     *                              on its entry in the queue being consumed.
+     * @throws SynchException       If a consumer encounters an error whilst processing the data element.
+     */
+    public void tryPut(E e) throws InterruptedException, SynchException;
+
+    /**
+     * Takes all available data items from the queue or blocks until some become available. The returned items
+     * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+     * producers, where the producers are still blocked.
+     *
+     * @param c       The collection to drain the data items into.
+     * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+     *
+     * @return A count of the number of elements that were drained from the queue.
+     */
+    public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock);
+
+    /**
+     * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
+     * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+     * producers, where the producers are still blocked.
+     *
+     * @param c           The collection to drain the data items into.
+     * @param maxElements The maximum number of elements to drain.
+     * @param unblock     If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+     *
+     * @return A count of the number of elements that were drained from the queue.
+     */
+    public SynchRef drainTo(Collection<SynchRecord<E>> c, int maxElements, boolean unblock);
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java Tue May 22 04:35:10 2007
@@ -0,0 +1,816 @@
+package org.apache.qpid.util.concurrent;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
+ * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being
+ * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and
+ * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is
+ * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch.
+ * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
+ *
+ * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete
+ * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract},
+ * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement
+ * the buffer other than by a queue, for example, by using an array.
+ *
+ * <p/>Normal queue methods to work asynchronously.
+ * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
+ * when their data is taken.
+ * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
+ * option to keep producers blocked until the consumer decides to release them.
+ *
+ * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
+ * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
+ * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
+ * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
+ * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
+ * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
+ * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @todo To create zero garbage collecting implemention will need to adapt the queue element containers
+ *       (SynchRefImpl) in such a way that one is needed per array element, they can be taken from/put back/cleared in
+ *       the queue without actually being moved from the array and they implement a way of forming them into a
+ *       collection (or Iterable) to pass to consumers (using a linked list scheme?). May not be worth the trouble.
+ */
+public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
+{
+    /** Used for logging. */
+    private static final Logger log = Logger.getLogger(BatchSynchQueueBase.class);
+
+    /** Holds a reference to the queue implementation that holds the buffer. */
+    Queue<SynchRecordImpl<E>> buffer;
+
+    /** Holds the number of items in the queue */
+    private int count;
+
+    /** Main lock guarding all access */
+    private ReentrantLock lock;
+
+    /** Condition for waiting takes */
+    private Condition notEmpty;
+
+    /** Condition for waiting puts */
+    private Condition notFull;
+
+    /**
+     * Creates a batch synch queue without fair thread scheduling.
+     */
+    public BatchSynchQueueBase()
+    {
+        this(false);
+    }
+
+    /**
+     * Ensures that the underlying buffer implementation is created.
+     *
+     * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer.
+     */
+    public BatchSynchQueueBase(boolean fair)
+    {
+        buffer = this.createQueue();
+
+        // Create the buffer lock with the fairness flag set accordingly.
+        lock = new ReentrantLock(fair);
+
+        // Create the non-empty and non-full condition monitors on the buffer lock.
+        notEmpty = lock.newCondition();
+        notFull = lock.newCondition();
+    }
+
+    /**
+     * Returns an iterator over the elements contained in this collection.
+     *
+     * @return An iterator over the elements contained in this collection.
+     */
+    public Iterator<E> iterator()
+    {
+        throw new RuntimeException("Not implemented.");
+    }
+
+    /**
+     * Returns the number of elements in this collection.  If the collection contains more than
+     * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>.
+     *
+     * @return The number of elements in this collection.
+     */
+    public int size()
+    {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+
+        try
+        {
+            return count;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
+     * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method
+     * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception.
+     *
+     * @param e The element to insert.
+     *
+     * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
+     */
+    public boolean offer(E e)
+    {
+        if (e == null)
+        {
+            throw new NullPointerException();
+        }
+
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+
+        try
+        {
+            return insert(e, false);
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to
+     * become available.
+     *
+     * @param e       The element to add.
+     * @param timeout How long to wait before giving up, in units of <tt>unit</tt>
+     * @param unit    A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
+     *
+     * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is
+     *         available.
+     *
+     * @throws InterruptedException If interrupted while waiting.
+     * @throws NullPointerException If the specified element is <tt>null</tt>.
+     */
+    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
+    {
+        if (e == null)
+        {
+            throw new NullPointerException();
+        }
+
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+
+        long nanos = unit.toNanos(timeout);
+
+        try
+        {
+            do
+            {
+                if (insert(e, false))
+                {
+                    return true;
+                }
+
+                try
+                {
+                    nanos = notFull.awaitNanos(nanos);
+                }
+                catch (InterruptedException ie)
+                {
+                    notFull.signal(); // propagate to non-interrupted thread
+                    throw ie;
+                }
+            }
+            while (nanos > 0);
+
+            return false;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty.
+     *
+     * @return The head of this queue, or <tt>null</tt> if this queue is empty.
+     */
+    public E poll()
+    {
+        final ReentrantLock lock = this.lock;
+
+        lock.lock();
+        try
+        {
+            if (count == 0)
+            {
+                return null;
+            }
+
+            E x = extract(true, true).getElement();
+
+            return x;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements
+     * are present on this queue.
+     *
+     * @param timeout How long to wait before giving up, in units of <tt>unit</tt>.
+     * @param unit    A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
+     *
+     * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present.
+     *
+     * @throws InterruptedException If interrupted while waiting.
+     */
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try
+        {
+            long nanos = unit.toNanos(timeout);
+
+            do
+            {
+                if (count != 0)
+                {
+                    E x = extract(true, true).getElement();
+
+                    return x;
+                }
+
+                try
+                {
+                    nanos = notEmpty.awaitNanos(nanos);
+                }
+                catch (InterruptedException ie)
+                {
+                    notEmpty.signal(); // propagate to non-interrupted thread
+                    throw ie;
+                }
+            }
+            while (nanos > 0);
+
+            return null;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty.
+     *
+     * @return The head of this queue, or <tt>null</tt> if this queue is empty.
+     */
+    public E peek()
+    {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+
+        try
+        {
+            return peekAtBufferHead();
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints)
+     * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit.
+     *
+     * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by
+     * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt>
+     * or <tt>take</tt> an element.
+     *
+     * @return The remaining capacity.
+     */
+    public int remainingCapacity()
+    {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+
+        try
+        {
+            return getBufferCapacity() - count;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Adds the specified element to this queue, waiting if necessary for space to become available.
+     *
+     * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised
+     * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these
+     * exceptions.
+     *
+     * @param e The element to add.
+     *
+     * @throws InterruptedException If interrupted while waiting.
+     */
+    public void put(E e) throws InterruptedException
+    {
+        try
+        {
+            tryPut(e);
+        }
+        catch (SynchException ex)
+        {
+            // This exception is deliberately ignored. See the method comment for information about this.
+        }
+    }
+
+    /**
+     * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
+     * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
+     *
+     * @param e The data element to put into the queue.
+     *
+     * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
+     *                              on its entry in the queue being consumed.
+     * @throws SynchException       If a consumer encounters an error whilst processing the data element.
+     */
+    public void tryPut(E e) throws InterruptedException, SynchException
+    {
+        if (e == null)
+        {
+            throw new NullPointerException();
+        }
+
+        // final Queue<E> items = this.buffer;
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+
+        try
+        {
+            while (count == getBufferCapacity())
+            {
+                // Release the lock and wait until the queue is not full.
+                notFull.await();
+            }
+        }
+        catch (InterruptedException ie)
+        {
+            notFull.signal(); // propagate to non-interrupted thread
+            throw ie;
+        }
+
+        // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block
+        // the producer until its data is taken.
+        insert(e, true);
+    }
+
+    /**
+     * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
+     * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
+     * producer blocked whilst taking just a single item, use the
+     * {@link #drainTo(java.util.Collection<uk.co.thebadgerset.common.util.concurrent.SynchRecord<E>>, int, boolean)}
+     * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
+     * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
+     * latencies accross many producers where possible.
+     *
+     * @return The head of this queue.
+     *
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    public E take() throws InterruptedException
+    {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+
+        try
+        {
+            try
+            {
+                while (count == 0)
+                {
+                    // Release the lock and wait until the queue becomes non-empty.
+                    notEmpty.await();
+                }
+            }
+            catch (InterruptedException ie)
+            {
+                notEmpty.signal(); // propagate to non-interrupted thread
+                throw ie;
+            }
+
+            // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is
+            // not full, and unblock the producer that owns the data item that is taken.
+            E x = extract(true, true).getElement();
+
+            return x;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Removes all available elements from this queue and adds them into the given collection.  This operation may be
+     * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements
+     * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated
+     * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further,
+     * the behavior of this operation is undefined if the specified collection is modified while the operation is in
+     * progress.
+     *
+     * @param objects The collection to transfer elements into.
+     *
+     * @return The number of elements transferred.
+     *
+     * @throws NullPointerException     If objects is null.
+     * @throws IllegalArgumentException If objects is this queue.
+     */
+    public int drainTo(Collection<? super E> objects)
+    {
+        return drainTo(objects, -1);
+    }
+
+    /**
+     * Removes at most the given number of available elements from this queue and adds them into the given collection.
+     * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements
+     * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue
+     * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if
+     * the specified collection is modified while the operation is in progress.
+     *
+     * @param objects     The collection to transfer elements into.
+     * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
+     *                    all elements.
+     *
+     * @return The number of elements transferred.
+     *
+     * @throws NullPointerException     If c is null.
+     * @throws IllegalArgumentException If c is this queue.
+     */
+    public int drainTo(Collection<? super E> objects, int maxElements)
+    {
+        if (objects == null)
+        {
+            throw new NullPointerException();
+        }
+
+        if (objects == this)
+        {
+            throw new IllegalArgumentException();
+        }
+
+        // final Queue<E> items = this.buffer;
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+
+        try
+        {
+            int n = 0;
+
+            for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
+            {
+                // Take items from the queue, do unblock the producers, but don't send not full signals yet.
+                objects.add(extract(true, false).getElement());
+            }
+
+            if (n > 0)
+            {
+                // count -= n;
+                notFull.signalAll();
+            }
+
+            return n;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Takes all available data items from the queue or blocks until some become available. The returned items
+     * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+     * producers, where the producers are still blocked.
+     *
+     * @param c       The collection to drain the data items into.
+     * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+     *
+     * @return A count of the number of elements that were drained from the queue.
+     */
+    public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
+    {
+        return drainTo(c, -1, unblock);
+    }
+
+    /**
+     * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
+     * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+     * producers, where the producers are still blocked.
+     *
+     * @param coll        The collection to drain the data items into.
+     * @param maxElements The maximum number of elements to drain.
+     * @param unblock     If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+     *
+     * @return A count of the number of elements that were drained from the queue.
+     */
+    public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
+    {
+        if (coll == null)
+        {
+            throw new NullPointerException();
+        }
+
+        // final Queue<E> items = this.buffer;
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+
+        try
+        {
+            int n = 0;
+
+            for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
+            {
+                // Extract the next record from the queue, don't signall the not full condition yet and release
+                // producers depending on whether the caller wants to or not.
+                coll.add(extract(false, unblock));
+            }
+
+            if (n > 0)
+            {
+                // count -= n;
+                notFull.signalAll();
+            }
+
+            return new SynchRefImpl(n, coll);
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * This abstract method should be overriden to return an empty queue. Different implementations of producer
+     * consumer buffers can control the order in which data is accessed using different queue implementations.
+     * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete
+     * implementations.
+     *
+     * @return An empty queue.
+     */
+    protected abstract <T> Queue<T> createQueue();
+
+    /**
+     * Insert element into the queue, then possibly signal that the queue is not empty and block the producer
+     * on the element until permission to procede is given.
+     *
+     * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process
+     * will be able to get access to the queue. Hence, unlock and block are always set together.
+     *
+     * <p/>Call only when holding the global lock.
+     *
+     * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked.
+     *
+     * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this
+     *         method may not return straight away, but only after the producer is unblocked by having its data
+     *         consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no
+     *         matter what value the unlockAndBlock flag has, leaving the global lock on.
+     */
+    protected boolean insert(E x, boolean unlockAndBlock)
+    {
+        // Create a new record for the data item.
+        SynchRecordImpl<E> record = new SynchRecordImpl<E>(x);
+
+        boolean result = buffer.offer(record);
+
+        if (result)
+        {
+            count++;
+
+            // Tell any waiting consumers that the queue is not empty.
+            notEmpty.signal();
+
+            if (unlockAndBlock)
+            {
+                // Allow other threads to read/write the queue.
+                lock.unlock();
+
+                // Wait until a consumer takes this data item.
+                record.waitForConsumer();
+            }
+
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    /**
+     * Extract element at current take position, advance, and signal.
+     *
+     * <p/>Call only when holding lock.
+     */
+    protected SynchRecordImpl<E> extract(boolean unblock, boolean signal)
+    {
+        SynchRecordImpl<E> result = buffer.remove();
+        count--;
+
+        if (signal)
+        {
+            notFull.signal();
+        }
+
+        if (unblock)
+        {
+            result.releaseImmediately();
+        }
+
+        return result;
+    }
+
+    /**
+     * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.
+     *
+     * <p/>Call only when holding lock.
+     *
+     * @return The maximum capacity of the buffer.
+     */
+    protected int getBufferCapacity()
+    {
+        if (buffer instanceof Capacity)
+        {
+            return ((Capacity) buffer).getCapacity();
+        }
+        else
+        {
+            return Integer.MAX_VALUE;
+        }
+    }
+
+    /**
+     * Return the head element from the buffer.
+     *
+     * <p/>Call only when holding lock.
+     *
+     * @return The head element from the buffer.
+     */
+    protected E peekAtBufferHead()
+    {
+        return buffer.peek().getElement();
+    }
+
+    public class SynchRefImpl implements SynchRef
+    {
+        /** Holds the number of synch records associated with this reference. */
+        int numRecords;
+
+        /** Holds a reference to the collection of synch records managed by this. */
+        Collection<SynchRecord<E>> records;
+
+        public SynchRefImpl(int n, Collection<SynchRecord<E>> records)
+        {
+            this.numRecords = n;
+            this.records = records;
+        }
+
+        public int getNumRecords()
+        {
+            return numRecords;
+        }
+
+        /**
+         * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked
+         * when this method is called. The exception to this is producers that have had their data put back onto the queue
+         * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked
+         * but will not return from their put call normally, but with an exception instead.
+         */
+        public void unblockProducers()
+        {
+            log.debug("public void unblockProducers(): called");
+
+            if (records != null)
+            {
+                for (SynchRecord<E> record : records)
+                {
+                    // This call takes account of items that have already been released, are to be requeued or are in
+                    // error.
+                    record.releaseImmediately();
+                }
+            }
+
+            records = null;
+        }
+    }
+
+    /**
+     * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows
+     * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when
+     * its data cannot be consumed.
+     */
+    public class SynchRecordImpl<E> implements SynchRecord<E>
+    {
+        /** A boolean latch that determines when the producer for this data item will be allowed to continue. */
+        BooleanLatch latch = new BooleanLatch();
+
+        /** The data element associated with this item. */
+        E element;
+
+        /**
+         * Create a new synch record.
+         *
+         * @param e The data element that the record encapsulates.
+         */
+        public SynchRecordImpl(E e)
+        {
+            // Keep the data element.
+            element = e;
+        }
+
+        /**
+         * Waits until the producer is given permission to proceded by a consumer.
+         */
+        public void waitForConsumer()
+        {
+            latch.await();
+        }
+
+        /**
+         * Gets the data element contained by this record.
+         *
+         * @return The data element contained by this record.
+         */
+        public E getElement()
+        {
+            return element;
+        }
+
+        /**
+         * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
+         * producers to a minimum by using this method to release them at the earliest possible moment when batch
+         * consuming records from sychronized producers.
+         */
+        public void releaseImmediately()
+        {
+            // Check that the record has not already been released, is in error or is to be requeued.
+            latch.signal();
+
+            // Propagate errors to the producer.
+
+            // Requeue items to be requeued.
+        }
+
+        /**
+         * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
+         * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or
+         * the {@link #releaseImmediately()} method.
+         *
+         * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
+         * element has already been unblocked.
+         */
+        public void reQueue()
+        {
+            throw new RuntimeException("Not implemented.");
+        }
+
+        /**
+         * Tells the synch queue to raise an exception with this elements producer. The exception is not raised
+         * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the
+         * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is
+         * raised on the producer.
+         *
+         * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
+         * because the exception is to be passed onto a different thread.
+         *
+         * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
+         * element has already been unblocked.
+         *
+         * @param e The exception to raise on the producer.
+         */
+        public void inError(Exception e)
+        {
+            throw new RuntimeException("Not implemented.");
+        }
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java Tue May 22 04:35:10 2007
@@ -0,0 +1,107 @@
+package org.apache.qpid.util.concurrent;
+
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * A BooleanLatch is like a set of traffic lights, where threads can wait at a red light until another thread gives
+ * the green light. When threads arrive at the latch it is initially red. They queue up until the green signal is
+ * given, at which point they can all acquire the latch in shared mode and continue to run concurrently. Once the latch
+ * is signalled it cannot be reset to red again.
+ *
+ * <p/> The latch uses a {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} to implement its synchronization.
+ * This has two internal states, 0 which means that the latch is blocked, and 1 which means that the latch is open.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Block threads until a go signal is given.
+ * </table>
+ *
+ * @todo Might be better to use a countdown latch to count down from 1. Its await method can throw interrupted
+ *       exception which makes the possibility of interruption more explicit, and provides a reminder to recheck the
+ *       latch condition before continuing.
+ */
+public class BooleanLatch
+{
+    /** Holds the synchronizer that provides the thread queueing synchronization. */
+    private final Sync sync = new Sync();
+
+    /**
+     * Tests whether or not the latch has been signalled, that is to say that, the light is green.
+     *
+     * <p/>This method is non-blocking.
+     *
+     * @return <tt>true</tt> if the latch may be acquired; the light is green.
+     */
+    public boolean isSignalled()
+    {
+        return sync.isSignalled();
+    }
+
+    /**
+     * Waits on the latch until the signal is given and the light is green. If the light is already green then the
+     * latch will be acquired and the thread will not have to wait.
+     *
+     * <p/>This method will block until the go signal is given or the thread is otherwise interrupted. Before carrying
+     * out any processing threads that return from this method should confirm that the go signal has really been given
+     * on this latch by calling the {@link #isSignalled()} method.
+     */
+    public void await()
+    {
+        sync.acquireShared(1);
+    }
+
+    /**
+     * Releases any threads currently waiting on the latch. This flips the light to green allowing any threads that
+     * were waiting for this condition to now run.
+     *
+     * <p/>This method is non-blocking.
+     */
+    public void signal()
+    {
+        sync.releaseShared(1);
+    }
+
+    /**
+     * Implements a thread queued synchronizer. The internal state 0 means that the queue is blocked and the internl
+     * state 1 means that the queue is released and that all waiting threads can acquire the synchronizer in shared
+     * mode.
+     */
+    private static class Sync extends AbstractQueuedSynchronizer
+    {
+        /**
+         * Attempts to acquire this synchronizer in shared mode. It may be acquired once it has been released.
+         *
+         * @param ignore This parameter is ignored.
+         *
+         * @return 1 if the shared acquisition succeeds and -1 if it fails.
+         */
+        protected int tryAcquireShared(int ignore)
+        {
+            return isSignalled() ? 1 : -1;
+        }
+
+        /**
+         * Releases the synchronizer, setting its internal state to 1.
+         *
+         * @param ignore This parameter is ignored.
+         *
+         * @return <tt>true</tt> always.
+         */
+        protected boolean tryReleaseShared(int ignore)
+        {
+            setState(1);
+
+            return true;
+        }
+
+        /**
+         * Tests if the synchronizer is signalled. It is signalled when its internal state it 1.
+         *
+         * @return <tt>true</tt> if the internal state is 1, <tt>false</tt> otherwise.
+         */
+        boolean isSignalled()
+        {
+            return getState() != 0;
+        }
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java Tue May 22 04:35:10 2007
@@ -0,0 +1,14 @@
+package org.apache.qpid.util.concurrent;
+
+/**
+ * An interface exposed by data structures that have a maximum capacity.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Report the maximum capacity.
+ * </table>
+ */
+public interface Capacity
+{
+    public int getCapacity();
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java Tue May 22 04:35:10 2007
@@ -0,0 +1,29 @@
+package org.apache.qpid.util.concurrent;
+
+import java.util.Queue;
+
+/**
+ * SynchBuffer completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying
+ * queue as an array. This uses FIFO ordering for the queue but restricts the maximum size of the queue to a fixed
+ * amount. It also has the advantage that, as the buffer does not grow and shrink dynamically, memory for the buffer
+ * is allocated up front and does not create garbage during the operation of the queue.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide array based FIFO queue to create a batch synched queue around.
+ * </table>
+ *
+ * @todo Write an array based buffer implementation that implements Queue.
+ */
+public class SynchBuffer<E> extends BatchSynchQueueBase<E>
+{
+    /**
+     * Returns an empty queue, implemented as an array.
+     *
+     * @return An empty queue, implemented as an array.
+     */
+    protected <T> Queue<T> createQueue()
+    {
+        throw new RuntimeException("Not implemented.");
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java Tue May 22 04:35:10 2007
@@ -0,0 +1,31 @@
+package org.apache.qpid.util.concurrent;
+
+/**
+ * SynchException is used to encapsulate exceptions with the data elements that caused them in order to send exceptions
+ * back from the consumers of a {@link BatchSynchQueue} to producers. The underlying exception should be retrieved from
+ * the {@link #getCause} method.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Encapsulate a data element and exception.
+ * </table>
+ */
+public class SynchException extends Exception
+{
+    /** Holds the data element that is in error. */
+    Object element;
+
+    /**
+     * Creates a new BaseApplicationException object.
+     *
+     * @param message The exception message.
+     * @param cause   The underlying throwable cause. This may be null.
+     */
+    public SynchException(String message, Throwable cause, Object element)
+    {
+        super(message, cause);
+
+        // Keep the data element that was in error.
+        this.element = element;
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java Tue May 22 04:35:10 2007
@@ -0,0 +1,27 @@
+package org.apache.qpid.util.concurrent;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * SynchQueue completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying
+ * queue as a linked list. This uses FIFO ordering for the queue and allows the queue to grow to accomodate more
+ * elements as needed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide linked list FIFO queue to create a batch synched queue around.
+ * </table>
+ */
+public class SynchQueue<E> extends BatchSynchQueueBase<E>
+{
+    /**
+     * Returns an empty queue, implemented as a linked list.
+     *
+     * @return An empty queue, implemented as a linked list.
+     */
+    protected <T> Queue<T> createQueue()
+    {
+        return new LinkedList<T>();
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java Tue May 22 04:35:10 2007
@@ -0,0 +1,53 @@
+package org.apache.qpid.util.concurrent;
+
+/**
+ * SynchRecord associates a data item from a {@link BatchSynchQueue} with its producer. This enables the data item data
+ * item to be put back on the queue without unblocking its producer, or to send exceptions to the producer.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Get the underlying data element.
+ * <tr><td> Put the data element back on the queue without unblocking its producer.
+ * <tr><td> Send and exception to the data elements producer.
+ * </table>
+ */
+public interface SynchRecord<E>
+{
+    /**
+     * Gets the data element contained by this record.
+     *
+     * @return The data element contained by this record.
+     */
+    public E getElement();
+
+    /**
+     * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
+     * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method.
+     *
+     * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element
+     * has already been unblocked.
+     */
+    public void reQueue();
+
+    /**
+     * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
+     * producers to a minimum by using this method to release them at the earliest possible moment when batch
+     * consuming records from sychronized producers.
+     */
+    public void releaseImmediately();
+
+    /**
+     * Tells the synch queue to raise an exception with this elements producer. The exception is not raised immediately
+     * but upon calling the {@link SynchRef#unblockProducers()} method. The exception will be wrapped in a
+     * {@link SynchException} before it is raised on the producer.
+     *
+     * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
+     * because the exception is to be passed onto a different thread.
+     *
+     * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element
+     * has already been unblocked.
+     *
+     * @param e The exception to raise on the producer.
+     */
+    public void inError(Exception e);
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java?view=auto&rev=540533
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java Tue May 22 04:35:10 2007
@@ -0,0 +1,30 @@
+package org.apache.qpid.util.concurrent;
+
+/**
+ * A SynchRef is an interface which is returned from the synchronous take and drain methods of {@link BatchSynchQueue},
+ * allowing call-backs to be made against the synchronizing strucutre. It allows the consumer to communicate when it
+ * wants producers that have their data taken to be unblocked.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Report number of records returned by a taking operation.
+ * <tr><td> Provide call-back to release producers of taken records.
+ * </table>
+ */
+public interface SynchRef
+{
+    /**
+     * Reports the number of records taken by the take or drain operation.
+     *
+     * @return The number of records taken by the take or drain operation.
+     */
+    public int getNumRecords();
+
+    /**
+     * Any producers that have had their data elements taken from the queue but have not been unblocked are
+     * unblocked when this method is called. The exception to this is producers that have had their data put back
+     * onto the queue by a consumer. Producers that have had exceptions for their data items registered by consumers
+     * will be unblocked but will not return from their put call normally, but with an exception instead.
+     */
+    public void unblockProducers();
+}