You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/05/22 15:45:52 UTC

svn commit: r540584 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/messageStore/ broker/src/main/java/org/apache/qpid/server/security/auth/database/ broker/src/main/java/or...

Author: rupertlssmith
Date: Tue May 22 06:45:50 2007
New Revision: 540584

URL: http://svn.apache.org/viewvc?view=rev&rev=540584
Log:
Eliminated catch/rethrow where underlying cause was discarded. All causes now wrapped.

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=540584&r1=540583&r2=540584
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Tue May 22 06:45:50 2007
@@ -48,6 +48,8 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -56,12 +58,10 @@
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.management.ManagedBroker;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exception.InternalErrorException;
-import org.apache.qpid.server.exception.QueueAlreadyExistsException;
 
 /**
  * This MBean implements the broker management interface and exposes the
@@ -113,8 +113,9 @@
                 Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
                 if (exchange == null)
                 {
-                    exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type),
-                                                               durable, false, 0);
+                    exchange =
+                        _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable,
+                            false, 0);
                     _exchangeRegistry.registerExchange(exchange);
                 }
                 else
@@ -183,9 +184,12 @@
                 try
                 {
                     _messageStore.createQueue(queue);
-                } catch (Exception e)
+                }
+                catch (Exception e)
                 {
-                    throw new JMException("problem creating queue " + queue.getName());
+                    JMException jme = new JMException("problem creating queue " + queue.getName());
+                    jme.initCause(e);
+                    throw jme;
                 }
             }
 
@@ -200,9 +204,7 @@
         }
         catch (AMQException ex)
         {
-            JMException jme = new JMException(ex.getMessage());
-            jme.initCause(ex);
-            throw new MBeanException(jme, "Error in creating queue " + queueName);
+            throw new MBeanException(ex, "Error in creating queue " + queueName);
         }
     }
 
@@ -228,17 +230,16 @@
         try
         {
             queue.delete();
-            if( queue.isDurable() )
+            if (queue.isDurable())
             {
                 _messageStore.destroyQueue(queue);
             }
         }
         catch (Exception ex)
         {
-           ex.printStackTrace();
-            JMException jme = new JMException(ex.getMessage());
-            jme.initCause(ex);
-            throw new MBeanException(jme, "Error in deleting queue " + queueName);
+            /*ex.printStackTrace();
+            JMException jme = new JMException(ex.getMessage());*/
+            throw new MBeanException(ex, "Error in deleting queue " + queueName);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java?view=diff&rev=540584&r1=540583&r2=540584
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java Tue May 22 06:45:50 2007
@@ -181,7 +181,7 @@
                 pstmt.executeUpdate();
             } catch (Exception e)
             {
-                throw new InternalErrorException("Cannot create Exchange: " + exchange);
+                throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
             } finally
             {
                 if (connection != null)
@@ -195,7 +195,7 @@
                         // we did not manage to commit this connection
                         // it is better to release it
                         _connectionPool.releaseDeadInstance();
-                        throw new InternalErrorException("Cannot create Exchange: " + exchange);
+                        throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
                     }
                 }
             }
@@ -223,7 +223,7 @@
                 pstmt.executeUpdate();
             } catch (Exception e)
             {
-                throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+                throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
             } finally
             {
                 if (connection != null)
@@ -237,7 +237,7 @@
                         // we did not manage to commit this connection
                         // it is better to release it
                         _connectionPool.releaseDeadInstance();
-                        throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+                        throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
                     }
                 }
             }
@@ -274,7 +274,7 @@
                 pstmt.executeUpdate();
             } catch (Exception e)
             {
-                throw new InternalErrorException("Cannot create Exchange: " + exchange);
+                throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
             } finally
             {
                 if (connection != null)
@@ -288,7 +288,7 @@
                         // we did not manage to commit this connection
                         // it is better to release it
                         _connectionPool.releaseDeadInstance();
-                        throw new InternalErrorException("Cannot create Exchange: " + exchange);
+                        throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
                     }
                 }
             }
@@ -316,7 +316,7 @@
             pstmt.executeUpdate();
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+            throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
         } finally
         {
             if (connection != null)
@@ -330,7 +330,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+                    throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
                 }
             }
         }
@@ -364,7 +364,7 @@
             pstmt.executeUpdate();
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot create Queue: " + queue);
+            throw new InternalErrorException("Cannot create Queue: " + queue, e);
         } finally
         {
             if (connection != null)
@@ -378,7 +378,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot create Queue: " + queue);
+                    throw new InternalErrorException("Cannot create Queue: " + queue, e);
                 }
             }
         }
@@ -404,7 +404,7 @@
             pstmt.executeUpdate();
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot remove Queue: " + queue);
+            throw new InternalErrorException("Cannot remove Queue: " + queue, e);
         } finally
         {
             if (connection != null)
@@ -418,7 +418,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot remove Queue: " + queue);
+                    throw new InternalErrorException("Cannot remove Queue: " + queue, e);
                 }
             }
         }
@@ -441,7 +441,7 @@
             stage(connection, m);
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot stage Message: " + m);
+            throw new InternalErrorException("Cannot stage Message: " + m, e);
         } finally
         {
             if (connection != null)
@@ -455,7 +455,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot stage Message: " + m);
+                    throw new InternalErrorException("Cannot stage Message: " + m, e);
                 }
             }
         }
@@ -481,7 +481,7 @@
             appendContent(connection, m, data, offset, size);
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot stage Message: " + m);
+            throw new InternalErrorException("Cannot stage Message: " + m, e);
         } finally
         {
             if (connection != null)
@@ -495,7 +495,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot stage Message: " + m);
+                    throw new InternalErrorException("Cannot stage Message: " + m, e);
                 }
             }
         }
@@ -545,7 +545,7 @@
             return result;
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot load Message: " + m);
+            throw new InternalErrorException("Cannot load Message: " + m, e);
         } finally
         {
             if (connection != null)
@@ -559,7 +559,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot load Message: " + m);
+                    throw new InternalErrorException("Cannot load Message: " + m, e);
                 }
             }
         }
@@ -577,7 +577,7 @@
             destroy(connection, m);
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot destroy message: " + m);
+            throw new InternalErrorException("Cannot destroy message: " + m, e);
         } finally
         {
             if (connection != null)
@@ -591,7 +591,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot destroy message: " + m);
+                    throw new InternalErrorException("Cannot destroy message: " + m, e);
                 }
             }
         }
@@ -644,7 +644,7 @@
                 queue.enqueue(m);
             } catch (Exception e)
             {
-                throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+                throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
             } finally
             {
                 if (tx == null && connection != null)
@@ -658,7 +658,7 @@
                         // we did not manage to commit this connection
                         // it is better to release it
                         _connectionPool.releaseDeadInstance();
-                        throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+                        throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
                     }
                 }
             }
@@ -710,7 +710,7 @@
                 queue.dequeue(m);
             } catch (Exception e)
             {
-                throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+                throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
             } finally
             {
                 if (tx == null && connection != null)
@@ -724,7 +724,7 @@
                         // we did not manage to commit this connection
                         // it is better to release it
                         _connectionPool.releaseDeadInstance();
-                        throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+                        throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
                     }
                 }
             }
@@ -762,7 +762,7 @@
             return result;
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot get all queues");
+            throw new InternalErrorException("Cannot get all queues", e);
         } finally
         {
             if (connection != null)
@@ -776,7 +776,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot get all queues");
+                    throw new InternalErrorException("Cannot get all queues", e);
                 }
             }
         }
@@ -793,7 +793,7 @@
             return getAllMessages(connection, queue);
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot get all queues");
+            throw new InternalErrorException("Cannot get all queues", e);
         } finally
         {
             if (connection != null)
@@ -807,7 +807,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot get all queues");
+                    throw new InternalErrorException("Cannot get all queues", e);
                 }
             }
         }
@@ -922,7 +922,7 @@
             // we did not manage to commit this connection
             // it is better to release it
             _connectionPool.releaseDeadInstance();
-            throw new InternalErrorException("Cannot commit connection =");
+            throw new InternalErrorException("Cannot commit connection =", e);
         }
     }
 
@@ -939,7 +939,7 @@
             // we did not manage to rollback this connection
             // it is better to release it
             _connectionPool.releaseDeadInstance();
-            throw new InternalErrorException("Cannot rollback connection");
+            throw new InternalErrorException("Cannot rollback connection", e);
         }
     }
 
@@ -1029,7 +1029,7 @@
             pstmt.executeUpdate();
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot save record: " + record);
+            throw new InternalErrorException("Cannot save record: " + record, e);
         }
     }
 
@@ -1053,7 +1053,7 @@
             pstmt.executeUpdate();
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot save xid: " + xid);
+            throw new InternalErrorException("Cannot save xid: " + xid, e);
         }
     }
 
@@ -1074,7 +1074,7 @@
             pstmt.executeUpdate();
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot delete record: " + tx.getXidID());
+            throw new InternalErrorException("Cannot delete record: " + tx.getXidID(), e);
         }
     }
 
@@ -1095,7 +1095,7 @@
             pstmt.executeUpdate();
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot delete xid: " + tx.getXidID());
+            throw new InternalErrorException("Cannot delete xid: " + tx.getXidID(), e);
         }
     }
 
@@ -1212,7 +1212,7 @@
             return result;
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+            throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m, e);
         } finally
         {
             if (connection != null)
@@ -1226,7 +1226,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+                    throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m, e);
                 }
             }
         }
@@ -1261,7 +1261,7 @@
             return result;
         } catch (Exception e)
         {
-            throw new InternalErrorException("Cannot get Content Header of message: " + m);
+            throw new InternalErrorException("Cannot get Content Header of message: " + m, e);
         } finally
         {
             if (connection != null)
@@ -1275,7 +1275,7 @@
                     // we did not manage to commit this connection
                     // it is better to release it
                     _connectionPool.releaseDeadInstance();
-                    throw new InternalErrorException("Cannot get Content Header of message: " + m);
+                    throw new InternalErrorException("Cannot get Content Header of message: " + m, e);
                 }
             }
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?view=diff&rev=540584&r1=540583&r2=540584
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Tue May 22 06:45:50 2007
@@ -27,20 +27,20 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.management.JMException;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.access.AMQUserManagementMBean;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.access.AMQUserManagementMBean;
-import org.apache.qpid.AMQException;
-
-import javax.management.JMException;
 
 public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatabaseManager
 {
@@ -107,7 +107,7 @@
     }
 
     private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index)
-            throws FileNotFoundException, ConfigurationException
+        throws FileNotFoundException, ConfigurationException
     {
         String baseName = _base + "(" + index + ").attributes.attribute.";
         List<String> argumentNames = config.getList(baseName + "name");
@@ -139,9 +139,9 @@
             if (method == null)
             {
                 throw new ConfigurationException("No method " + methodName + " found in class "
-                                                 + principalDatabase.getClass()
-                                                 + " hence unable to configure principal database. The method must be public and "
-                                                 + "have a single String argument with a void return type");
+                    + principalDatabase.getClass()
+                    + " hence unable to configure principal database. The method must be public and "
+                    + "have a single String argument with a void return type");
             }
 
             try
@@ -152,7 +152,7 @@
             {
                 if (ite instanceof ConfigurationException)
                 {
-                    throw(ConfigurationException) ite;
+                    throw (ConfigurationException) ite;
                 }
                 else
                 {
@@ -178,7 +178,8 @@
 
             if (principalDBs.size() == 0)
             {
-                throw new ConfigurationException("No principal-database specified for jmx security(" + baseSecurity + ".principal-database)");
+                throw new ConfigurationException("No principal-database specified for jmx security(" + baseSecurity
+                    + ".principal-database)");
             }
 
             String databaseName = principalDBs.get(0);
@@ -196,18 +197,19 @@
 
             if (jmxaccesslist.size() == 0)
             {
-                throw new ConfigurationException("No access control files specified for jmx security(" + baseSecurity + ".access)");
+                throw new ConfigurationException("No access control files specified for jmx security(" + baseSecurity
+                    + ".access)");
             }
 
             String jmxaccesssFile = null;
-            
+
             try
             {
                 jmxaccesssFile = PropertyUtils.replaceProperties(jmxaccesslist.get(0));
             }
             catch (PropertyException e)
             {
-                throw new ConfigurationException("Unable to parse access control filename '" + jmxaccesssFile + "'");
+                throw new ConfigurationException("Unable to parse access control filename '" + jmxaccesssFile + "'", e);
             }
 
             try

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java?view=diff&rev=540584&r1=540583&r2=540584
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java Tue May 22 06:45:50 2007
@@ -191,7 +191,7 @@
                 } catch (Exception e)
                 {
                     _log.error("Cannot prepare tx: " + xid);
-                    throw new InternalErrorException("Cannot prepare tx: " + xid);
+                    throw new InternalErrorException("Cannot prepare tx: " + xid, e);
                 }
                 tx.prepare();
             }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java?view=diff&rev=540584&r1=540583&r2=540584
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java Tue May 22 06:45:50 2007
@@ -37,11 +37,6 @@
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * </table>
- *
- * @todo To create zero garbage collecting implemention will need to adapt the queue element containers
- *       (SynchRefImpl) in such a way that one is needed per array element, they can be taken from/put back/cleared in
- *       the queue without actually being moved from the array and they implement a way of forming them into a
- *       collection (or Iterable) to pass to consumers (using a linked list scheme?). May not be worth the trouble.
  */
 public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
 {
@@ -350,7 +345,7 @@
      * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
      * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
      *
-     * @param e The data element to put into the queue.
+     * @param e The data element to put into the queue. Cannot be null.
      *
      * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
      *                              on its entry in the queue being consumed.
@@ -390,7 +385,7 @@
      * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
      * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
      * producer blocked whilst taking just a single item, use the
-     * {@link #drainTo(java.util.Collection<uk.co.thebadgerset.common.util.concurrent.SynchRecord<E>>, int, boolean)}
+     * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)}
      * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
      * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
      * latencies accross many producers where possible.