You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/05/17 15:27:42 UTC

svn commit: r538914 - in /incubator/qpid/trunk/qpid: ./ java/common/src/main/java/org/apache/qpid/common/ java/common/src/main/java/org/apache/qpid/configuration/ java/common/src/main/java/org/apache/qpid/exchange/ java/common/src/main/java/org/apache/...

Author: rupertlssmith
Date: Thu May 17 06:27:40 2007
New Revision: 538914

URL: http://svn.apache.org/viewvc?view=rev&rev=538914
Log:
Merged revisions 538907 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r538907 | rupertlssmith | 2007-05-17 14:17:59 +0100 (Thu, 17 May 2007) | 1 line
  
  Added to the Javadoc
........

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java Thu May 17 06:27:40 2007
@@ -14,27 +14,46 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.common;
 
 import org.apache.qpid.framing.AMQShortString;
 
+/**
+ * Specifies the different filter types for consumers that filter their messages.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent different consumer filter types.
+ * </table>
+ */
 public enum AMQPFilterTypes
 {
     JMS_SELECTOR("x-filter-jms-selector"),
     NO_CONSUME("x-filter-no-consume"),
     AUTO_CLOSE("x-filter-auto-close");
 
+    /** The identifying string for the filter type. */
     private final AMQShortString _value;
 
+    /**
+     * Creates a new filter type from its identifying string.
+     *
+     * @param value The identifying string.
+     */
     AMQPFilterTypes(String value)
     {
         _value = new AMQShortString(value);
     }
 
+    /**
+     * Gets the identifying string of the filter type.
+     *
+     * @return The identifying string of the filter type.
+     */
     public AMQShortString getValue()
     {
         return _value;

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java Thu May 17 06:27:40 2007
@@ -14,12 +14,20 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.common;
 
+/**
+ * Specifies the available client property types that different clients can use to identify themselves with.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Specify the available client property types.
+ * </table>
+ */
 public enum ClientProperties
 {
     instance,

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java Thu May 17 06:27:40 2007
@@ -14,9 +14,9 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.common;
 
@@ -27,23 +27,56 @@
 
 import org.apache.log4j.Logger;
 
+/**
+ * QpidProperties captures the project name, version number, and source code repository revision number from a properties
+ * file which is generated as part of the build process. Normally, the name and version number are pulled from the module
+ * name and version number of the Maven build POM, but could come from other sources if the build system is changed. The
+ * idea behind this, is that every build has these values incorporated directly into its jar file, so that code in the
+ * wild can be identified, should its origination be forgotten.
+ *
+ * <p/>To get the build version of any Qpid code call the {@link #main} method. This version string is usually also
+ * printed to the console on broker start up.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td>Load build versioning information into the runtime, for code identification purposes.
+ * </table>
+ *
+ * @todo Code to locate/load/log properties can be factored into a reusable properties utils class. Avoid having this
+ *       same snippet of loading code scattered in many places.
+ *
+ * @todo Could also add a build number property for a sequential build number assigned by an automated build system, for
+ *       build reproducability purposes.
+ */
 public class QpidProperties
 {
+    /** Used for debugging purposes. */
     private static final Logger _logger = Logger.getLogger(QpidProperties.class);
-    
+
+    /** The name of the version properties file to load from the class path. */
     public static final String VERSION_RESOURCE = "qpidversion.properties";
 
+    /** Defines the name of the product property. */
     public static final String PRODUCT_NAME_PROPERTY = "qpid.name";
+
+    /** Defines the name of the version property. */
     public static final String RELEASE_VERSION_PROPERTY = "qpid.version";
+
+    /** Defines the name of the source code revision property. */
     public static final String BUILD_VERSION_PROPERTY = "qpid.svnversion";
 
+    /** Defines the default value for all properties that cannot be loaded. */
     private static final String DEFAULT = "unknown";
 
+    /** Holds the product name. */
     private static String productName = DEFAULT;
+
+    /** Holds the product version. */
     private static String releaseVersion = DEFAULT;
+
+    /** Holds the source code revision. */
     private static String buildVersion = DEFAULT;
 
-    /** Loads the values from the version properties file. */
+    // Loads the values from the version properties file.
     static
     {
         Properties props = new Properties();
@@ -62,16 +95,17 @@
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Dumping QpidProperties");
-                    for (Map.Entry<Object,Object> entry : props.entrySet())
+                    for (Map.Entry<Object, Object> entry : props.entrySet())
                     {
-                        _logger.debug("Property: " + entry.getKey() + " Value: "+ entry.getValue());
+                        _logger.debug("Property: " + entry.getKey() + " Value: " + entry.getValue());
                     }
+
                     _logger.debug("End of property dump");
                 }
 
                 productName = readPropertyValue(props, PRODUCT_NAME_PROPERTY);
                 releaseVersion = readPropertyValue(props, RELEASE_VERSION_PROPERTY);
-                buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY);                
+                buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY);
             }
         }
         catch (IOException e)
@@ -81,26 +115,56 @@
         }
     }
 
+    /**
+     * Gets the product name.
+     *
+     * @return The product name.
+     */
     public static String getProductName()
     {
         return productName;
     }
 
+    /**
+     * Gets the product version.
+     *
+     * @return The product version.
+     */
     public static String getReleaseVersion()
     {
         return releaseVersion;
     }
 
+    /**
+     * Gets the source code revision.
+     *
+     * @return The source code revision.
+     */
     public static String getBuildVersion()
     {
         return buildVersion;
     }
 
+    /**
+     * Extracts all of the version information as a printable string.
+     *
+     * @return All of the version information as a printable string.
+     */
     public static String getVersionString()
     {
         return getProductName() + " - " + getReleaseVersion() + " build: " + getBuildVersion();
     }
 
+    /**
+     * Helper method to extract a named property from properties.
+     *
+     * @param props        The properties.
+     * @param propertyName The named property to extract.
+     *
+     * @return The extracted property or a default value if the properties do not contain the named property.
+     *
+     * @todo A bit pointless.
+     */
     private static String readPropertyValue(Properties props, String propertyName)
     {
         String retVal = (String) props.get(propertyName);
@@ -108,9 +172,16 @@
         {
             retVal = DEFAULT;
         }
+
         return retVal;
     }
 
+    /**
+     * Prints the versioning information to the console. This is extremely usefull for identifying Qpid code in the
+     * wild, where the origination of the code has been forgotten.
+     *
+     * @param args Does not require any arguments.
+     */
     public static void main(String[] args)
     {
         System.out.println(getVersionString());

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,7 +26,7 @@
 import java.lang.annotation.Target;
 
 /**
- * Marks a field as being "configured" externally.
+ * Marks a field as having a "configured" value injected into it by a configurator.
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.FIELD)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java Thu May 17 06:27:40 2007
@@ -24,7 +24,16 @@
 import org.apache.qpid.protocol.AMQConstant;
 
 /**
- * Indicates an error parsing a property expansion.
+ * Indicates a failure to parse a property expansion. See {@link PropertyUtils} for the code that does property
+ * expansions.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaboration
+ * <tr><td> Represent failure to expand a property name into a value.
+ * </table>
+ *
+ * @todo AMQException is to be reserved for protocol related conditions. This exception does not have a status code, so
+ *       don't inherit from AMQException.
  */
 public class PropertyException extends AMQException
 {
@@ -33,6 +42,7 @@
         super(message);
     }
 
+    /*
     public PropertyException(String msg, Throwable t)
     {
         super(msg, t);
@@ -47,4 +57,5 @@
     {
         super(errorCode, msg);
     }
+     */
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,24 +24,35 @@
 import java.util.Iterator;
 
 /**
- * Based on code in Apache Ant, this utility class handles property expansion. This
- * is most useful in config files and so on.
+ * PropertyUtils provides helper methods for dealing with Java properties.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Expand system properties into strings with named expansions.
+ * </table>
+ *
+ * @todo Make the lookup method generic by passing in the properties to use for the expansion, rather than hard coding
+ *       as system properties. The expansion code has greater potential for re-use that way.
+ *
+ * @todo Some more property related code could be added to this utils class, which might more appropriately reside under
+ *       org.apache.qpid.util. For example standardised code to load properties from a resource name, currently found in
+ *       QpidProperties and possibly other places could be moved here.
  */
 public class PropertyUtils
 {
     /**
-     * Replaces <code>${xxx}</code> style constructions in the given value
-     * with the string value of the corresponding data types. Replaces only system
-     * properties
+     * Given a string that contains substrings of the form <code>${xxx}</code>, looks up the valuea of 'xxx' as a
+     * system properties and substitutes tham back into the original string, to provide a property value expanded
+     * string.
      *
-     * @param value The string to be scanned for property references.
-     *              May be <code>null</code>, in which case this
+     * @param value The string to be scanned for property references. May be <code>null</code>, in which case this
      *              method returns immediately with no effect.
-     * @return the original string with the properties replaced, or
-     *         <code>null</code> if the original string is <code>null</code>.
-     * @throws PropertyException if the string contains an opening
-     *                           <code>${</code> without a closing
-     *                           <code>}</code>
+     *
+     * @return The original string with the properties replaced, or <code>null</code> if the original string is
+     *         <code>null</code>.
+     *
+     * @throws PropertyException If the string contains an opening <code>${</code> without a balancing <code>}</code>,
+     *                           or if the property to expand does not exist as a system property.
      */
     public static String replaceProperties(String value) throws PropertyException
     {
@@ -69,11 +80,12 @@
 
                 if (replacement == null)
                 {
-                    throw new PropertyException("Property ${" + propertyName +
-                                                "} has not been set");
+                    throw new PropertyException("Property ${" + propertyName + "} has not been set");
                 }
+
                 fragment = replacement;
             }
+
             sb.append(fragment);
         }
 
@@ -81,32 +93,30 @@
     }
 
     /**
-     * Default parsing method. Parses the supplied value for properties which are specified
-     * using ${foo} syntax. $X is left as is, and $$ specifies a single $.
-     * @param value the property string to parse
-     * @param fragments is populated with the string fragments. A null means "insert a
-     * property value here. The number of nulls in the list when populated is equal to the
-     * size of the propertyRefs list
-     * @param propertyRefs populated with the property names to be added into the final
-     * String.
+     * Parses the supplied value for properties which are specified using ${foo} syntax. $X is left as is, and $$
+     * specifies a single $.
+     *
+     * @param value        The property string to parse.
+     * @param fragments    Is populated with the string fragments. A null means "insert a property value here. The number
+     *                     of nulls in the list when populated is equal to the size of the propertyRefs list.
+     * @param propertyRefs Populated with the property names to be added into the final string.
      */
-    private static void parsePropertyString(String value, ArrayList<String> fragments,
-                                            ArrayList<String> propertyRefs)
-            throws PropertyException
+    private static void parsePropertyString(String value, ArrayList<String> fragments, ArrayList<String> propertyRefs)
+        throws PropertyException
     {
         int prev = 0;
         int pos;
-        //search for the next instance of $ from the 'prev' position
+        // search for the next instance of $ from the 'prev' position
         while ((pos = value.indexOf("$", prev)) >= 0)
         {
 
-            //if there was any text before this, add it as a fragment
+            // if there was any text before this, add it as a fragment
             if (pos > 0)
             {
                 fragments.add(value.substring(prev, pos));
             }
-            //if we are at the end of the string, we tack on a $
-            //then move past it
+            // if we are at the end of the string, we tack on a $
+            // then move past it
             if (pos == (value.length() - 1))
             {
                 fragments.add("$");
@@ -114,8 +124,8 @@
             }
             else if (value.charAt(pos + 1) != '{')
             {
-                //peek ahead to see if the next char is a property or not
-                //not a property: insert the char as a literal
+                // peek ahead to see if the next char is a property or not
+                // not a property: insert the char as a literal
                 if (value.charAt(pos + 1) == '$')
                 {
                     // two $ map to one $
@@ -135,22 +145,20 @@
                 int endName = value.indexOf('}', pos);
                 if (endName < 0)
                 {
-                    throw new PropertyException("Syntax error in property: " +
-                                                value);
+                    throw new PropertyException("Syntax error in property: " + value);
                 }
+
                 String propertyName = value.substring(pos + 2, endName);
                 fragments.add(null);
                 propertyRefs.add(propertyName);
                 prev = endName + 1;
             }
         }
-        //no more $ signs found
-        //if there is any tail to the file, append it
+        // no more $ signs found
+        // if there is any tail to the file, append it
         if (prev < value.length())
         {
             fragments.add(value.substring(prev));
         }
     }
-
-
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,25 +22,44 @@
 
 import org.apache.qpid.framing.AMQShortString;
 
+/**
+ * Defines the names of the standard AMQP exchanges that every AMQP broker should provide. These exchange names
+ * and type are given in the specification.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Defines the standard AMQP exchange names.
+ * <tr><td> Defines the standard AMQP exchange types.
+ * </table>
+ *
+ * @todo A type safe enum, might be more appropriate for the exchange types.
+ */
 public class ExchangeDefaults
 {
-    public final static AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>");
-
-    public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic");
-
-    public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic");
+    /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */
+    public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>");
 
-    public final static AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct");
+    /** The pre-defined topic exchange, the broker SHOULD provide this. */
+    public static final AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic");
 
-    public final static AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct");
+    /** Defines the identifying type name of topic exchanges. */
+    public static final AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic");
 
-    public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match");
+    /** The pre-defined direct exchange, the broker MUST provide this. */
+    public static final AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct");
 
-    public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers");
+    /** Defines the identifying type name of direct exchanges. */
+    public static final AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct");
 
-    public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout");
+    /** The pre-defined headers exchange, the specification does not say this needs to be provided. */
+    public static final AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match");
 
-    public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
+    /** Defines the identifying type name of headers exchanges. */
+    public static final AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers");
 
+    /** The pre-defined fanout exchange, the boker MUST provide this. */
+    public static final AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout");
 
+    /** Defines the identifying type name of fanout exchanges. */
+    public static final AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,18 +23,51 @@
 import org.apache.mina.common.IoFilter;
 import org.apache.mina.common.IoSession;
 
-
-abstract public class Event
+/**
+ * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain
+ * for later processing. It is an abstract class, with different implementations for continuations of different kinds
+ * of Mina events.
+ *
+ * <p/>These continuations are typically batched by {@link Job} for processing by a worker thread pool.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Process a continuation in the context of a Mina session.
+ * </table>
+ *
+ * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static
+ *       to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of
+ *       the context of the outer Event class. The inner class construction used here is preventing common code re-use
+ *       (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like
+ *       a justifiable use of inner classes. Move the inner classes out into their own files.
+ *
+ * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as
+ *       a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract,
+ *       it is really an interface, so could just drop it and use the continuation interface instead.
+ */
+public abstract class Event
 {
-
+    /**
+     * Creates a continuation.
+     */
     public Event()
-    {
-    }
-
-
-    abstract public void process(IoSession session);
-
+    { }
 
+    /**
+     * Processes the continuation in the context of a Mina session.
+     *
+     * @param session The Mina session.
+     */
+    public abstract void process(IoSession session);
+
+    /**
+     * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter.
+     *
+     * <p/><table id="crc"><caption>CRC Card</caption>
+     * <tr><th> Responsibilities <th> Collaborations
+     * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
+     * </table>
+     */
     public static final class ReceivedEvent extends Event
     {
         private final Object _data;
@@ -59,7 +92,15 @@
         }
     }
 
-
+    /**
+     * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter.
+     *
+     * <p/><table id="crc"><caption>CRC Card</caption>
+     * <tr><th> Responsibilities <th> Collaborations
+     * <tr><td> Pass a Mina filterWrite event to a NextFilter.
+     *     <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession}
+     * </table>
+     */
     public static final class WriteEvent extends Event
     {
         private final IoFilter.WriteRequest _data;
@@ -72,7 +113,6 @@
             _data = data;
         }
 
-
         public void process(IoSession session)
         {
             _nextFilter.filterWrite(session, _data);
@@ -84,8 +124,14 @@
         }
     }
 
-
-
+    /**
+     * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter.
+     *
+     * <p/><table id="crc"><caption>CRC Card</caption>
+     * <tr><th> Responsibilities <th> Collaborations
+     * <tr><td> Pass a Mina sessionClosed event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
+     * </table>
+     */
     public static final class CloseEvent extends Event
     {
         private final IoFilter.NextFilter _nextFilter;
@@ -96,7 +142,6 @@
             _nextFilter = nextFilter;
         }
 
-
         public void process(IoSession session)
         {
             _nextFilter.sessionClosed(session);
@@ -107,5 +152,4 @@
             return _nextFilter;
         }
     }
-
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,48 +26,77 @@
 import org.apache.mina.common.IoSession;
 
 /**
- * Holds events for a session that will be processed asynchronously by
- * the thread pool in PoolingFilter.
+ * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
+ * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially
+ * processing all of its aggregated events.
+ *
+ * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when
+ * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events
+ * may be processed in each run of the job, several runs may be required to clear the queue.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Aggregate many coninuations together into a single continuation.
+ * <tr><td> Sequentially process aggregated continuations. <td> {@link Event}
+ * <tr><td> Provide running and completion status of the aggregate continuation.
+ * <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler}
+ * </table>
+ *
+ * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a
+ *       continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable
+ *       piece of code. There may be other palces than the mina filter chain where continuation batching is used within
+ *       qpid, so abstracting this out could provide a usefull building block. This also opens the way to different
+ *       kinds of job with a common interface, e.g. parallel or sequential jobs etc.
+ *
+ * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
  */
 public class Job implements Runnable
 {
+    /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
     private final int _maxEvents;
+
+    /** The Mina session. */
     private final IoSession _session;
+
+    /** Holds the queue of events that make up the job. */
     private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+
+    /** Holds a status flag, that indicates when the job is actively running. */
     private final AtomicBoolean _active = new AtomicBoolean();
-    //private final AtomicInteger _refCount = new AtomicInteger();
+
+    /** Holds the completion continuation, called upon completion of a run of the job. */
     private final JobCompletionHandler _completionHandler;
 
+    /**
+     * Creates a new job that aggregates many continuations together.
+     *
+     * @param session           The Mina session.
+     * @param completionHandler The per job run, terminal continuation.
+     * @param maxEvents         The maximum number of aggregated continuations to process per run of the job.
+     */
     Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
     {
         _session = session;
         _completionHandler = completionHandler;
         _maxEvents = maxEvents;
     }
-//
-//    void acquire()
-//    {
-//        _refCount.incrementAndGet();
-//    }
-//
-//    void release()
-//    {
-//        _refCount.decrementAndGet();
-//    }
-//
-//    boolean isReferenced()
-//    {
-//        return _refCount.get() > 0;
-//    }
 
+    /**
+     * Enqueus a continuation for sequential processing by this job.
+     *
+     * @param evt The continuation to enqueue.
+     */
     void add(Event evt)
     {
         _eventQueue.add(evt);
     }
 
+    /**
+     * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
+     */
     void processAll()
     {
-        //limit the number of events processed in one run
+        // limit the number of events processed in one run
         for (int i = 0; i < _maxEvents; i++)
         {
             Event e = _eventQueue.poll();
@@ -82,21 +111,37 @@
         }
     }
 
-    boolean isComplete()
+    /**
+     * Tests if there are no more enqueued continuations to process.
+     *
+     * @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise.
+     */
+    public boolean isComplete()
     {
         return _eventQueue.peek() == null;
     }
 
-    boolean activate()
+    /**
+     * Marks this job as active if it is inactive. This method is thread safe.
+     *
+     * @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise.
+     */
+    public boolean activate()
     {
         return _active.compareAndSet(false, true);
     }
 
-    void deactivate()
+    /**
+     * Marks this job as inactive. This method is thread safe.
+     */
+    public void deactivate()
     {
         _active.set(false);
     }
 
+    /**
+     * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation.
+     */
     public void run()
     {
         processAll();
@@ -104,7 +149,12 @@
         _completionHandler.completed(_session, this);
     }
 
-
+    /**
+     * Another interface for a continuation.
+     *
+     * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
+     *       Runnable or a custom Continuation interface.
+     */
     static interface JobCompletionHandler
     {
         public void completed(IoSession session, Job job);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Thu May 17 06:27:40 2007
@@ -31,22 +31,131 @@
 
 import org.apache.qpid.pool.Event.CloseEvent;
 
-public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
+/**
+ * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
+ * adds no behaviour by default to the filter chain, it is abstract.
+ *
+ * <p/>PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by
+ * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is
+ * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as
+ * a buffer between stages of the pipeline.
+ *
+ * <p/>There are two convenience methods, {@link #createAynschReadPoolingFilter} and
+ * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and
+ * 'filterWrite' events, making it possible to process these event streams seperately.
+ *
+ * <p/>Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the
+ * Mina session they are working with, and store it in the session against their identifying name. This allows different
+ * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their
+ * workloads in different jobs.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Implement default, pass through filter.
+ * <tr><td> Create pooling filters and a specific thread pool. <td> {@link ReferenceCountingExecutorService}
+ * <tr><td> Provide the ability to batch Mina events for asynchronous processing. <td> {@link Job}, {@link Event}
+ * <tr><td> Provide a terminal continuation to keep jobs running till empty.
+ *     <td> {@link Job}, {@link Job.JobCompletionHandler}
+ * </table>
+ *
+ * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
+ *       The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
+ *       pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
+ *       so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
+ *       Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
+ *       stages.
+ *
+ * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
+ *       it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
+ *       and trips off another batch of 10 until they are all done. Why not just have a straight forward
+ *       consumer/producer queue scenario without the batches of 10?
+ *
+ * @todo The static helper methods are pointless. Could just call new.
+ */
+public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
 {
+    /** Used for debugging purposes. */
     private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
 
+    /** Holds a mapping from Mina sessions to batched jobs for execution. */
     private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
+
+    /** Holds the managed reference to obtain the executor for the batched jobs. */
     private final ReferenceCountingExecutorService _poolReference;
 
+    /** Used to hold a name for identifying differeny pooling filter types. */
     private final String _name;
+
+    /** Defines the maximum number of events that will be batched into a single job. */
     private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
 
+    /**
+     * Creates a named pooling filter, on the specified shared thread pool.
+     *
+     * @param refCountingPool The thread pool reference.
+     * @param name            The identifying name of the filter type.
+     */
     public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
     {
         _poolReference = refCountingPool;
         _name = name;
     }
 
+    /**
+     * Helper method to get an instance of a pooling filter that handles read events asynchronously.
+     *
+     * @param refCountingPool A managed reference to the thread pool.
+     * @param name            The filter types identifying name.
+     *
+     * @return A pooling filter for asynchronous read events.
+     */
+    public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+    {
+        return new AsynchReadPoolingFilter(refCountingPool, name);
+    }
+
+    /**
+     * Helper method to get an instance of a pooling filter that handles write events asynchronously.
+     *
+     * @param refCountingPool A managed reference to the thread pool.
+     * @param name            The filter types identifying name.
+     *
+     * @return A pooling filter for asynchronous write events.
+     */
+    public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+    {
+        return new AsynchWritePoolingFilter(refCountingPool, name);
+    }
+
+    /**
+     * Called by Mina to initialize this filter. Takes a reference to the thread pool.
+     */
+    public void init()
+    {
+        _logger.info("Init called on PoolingFilter " + toString());
+
+        // Called when the filter is initialised in the chain. If the reference count is
+        // zero this acquire will initialise the pool.
+        _poolReference.acquireExecutorService();
+    }
+
+    /**
+     * Called by Mina to clean up this filter. Releases the reference to the thread pool.
+     */
+    public void destroy()
+    {
+        _logger.info("Destroy called on PoolingFilter " + toString());
+
+        // When the reference count gets to zero we release the executor service.
+        _poolReference.releaseExecutorService();
+    }
+
+    /**
+     * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+     *
+     * @param session The Mina session to work in.
+     * @param event   The event to hand off asynchronously.
+     */
     void fireAsynchEvent(IoSession session, Event event)
     {
         Job job = getJobForSession(session);
@@ -62,31 +171,50 @@
 
     }
 
+    /**
+     * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously
+     * handled events.
+     *
+     * @param session The Mina session.
+     */
     public void createNewJobForSession(IoSession session)
     {
         Job job = new Job(session, this, _maxEvents);
         session.setAttribute(_name, job);
     }
 
+    /**
+     * Retrieves this filters Job, by this filters name, from the Mina session.
+     *
+     * @param session The Mina session.
+     *
+     * @return The Job for this filter to place asynchronous events into.
+     */
     private Job getJobForSession(IoSession session)
     {
         return (Job) session.getAttribute(_name);
     }
 
-    private Job createJobForSession(IoSession session)
+    /*private Job createJobForSession(IoSession session)
     {
         return addJobForSession(session, new Job(session, this, _maxEvents));
-    }
+    }*/
 
-    private Job addJobForSession(IoSession session, Job job)
+    /*private Job addJobForSession(IoSession session, Job job)
     {
         // atomic so ensures all threads agree on the same job
         Job existing = _jobs.putIfAbsent(session, job);
 
         return (existing == null) ? job : existing;
-    }
+    }*/
 
-    // Job.JobCompletionHandler
+    /**
+     * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+     * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
+     *
+     * @param session The Mina session to work in.
+     * @param job     The job that completed.
+     */
     public void completed(IoSession session, Job job)
     {
         // if (job.isComplete())
@@ -109,129 +237,228 @@
         }
     }
 
-    // IoFilter methods that are processed by threads on the pool
-
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
     {
         nextFilter.sessionOpened(session);
     }
 
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
     {
         nextFilter.sessionClosed(session);
     }
 
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     * @param status     The session idle status.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception
     {
         nextFilter.sessionIdle(session, status);
     }
 
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     * @param cause      The underlying exception.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception
     {
         nextFilter.exceptionCaught(session, cause);
     }
 
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     * @param message    The message received.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
     {
         nextFilter.messageReceived(session, message);
     }
 
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     * @param message    The message sent.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
     {
         nextFilter.messageSent(session, message);
     }
 
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter   The next filter in the chain.
+     * @param session      The Mina session.
+     * @param writeRequest The write request event.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
         throws Exception
     {
         nextFilter.filterWrite(session, writeRequest);
     }
 
-    // IoFilter methods that are processed on current thread (NOT on pooled thread)
-
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
     public void filterClose(NextFilter nextFilter, IoSession session) throws Exception
     {
         nextFilter.filterClose(session);
     }
 
-    public void sessionCreated(NextFilter nextFilter, IoSession session)
+    /**
+     * No-op pass through filter to the next filter in the chain.
+     *
+     * @param nextFilter The next filter in the chain.
+     * @param session    The Mina session.
+     *
+     * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+     *                   overriding sub-classes the ability to.
+     */
+    public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception
     {
         nextFilter.sessionCreated(session);
     }
 
+    /**
+     * Prints the filter types identifying name to a string, mainly for debugging purposes.
+     *
+     * @return The filter types identifying name.
+     */
     public String toString()
     {
         return _name;
     }
 
-    // LifeCycle methods
-
-    public void init()
-    {
-        _logger.info("Init called on PoolingFilter " + toString());
-        // called when the filter is initialised in the chain. If the reference count is
-        // zero this acquire will initialise the pool
-        _poolReference.acquireExecutorService();
-    }
-
-    public void destroy()
-    {
-        _logger.info("Destroy called on PoolingFilter " + toString());
-        // when the reference count gets to zero we release the executor service
-        _poolReference.releaseExecutorService();
-    }
-
+    /**
+     * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events
+     * asynchronously.
+     */
     public static class AsynchReadPoolingFilter extends PoolingFilter
     {
-
+        /**
+         * Creates a pooling filter that handles read events asynchronously.
+         *
+         * @param refCountingPool A managed reference to the thread pool.
+         * @param name            The filter types identifying name.
+         */
         public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
             super(refCountingPool, name);
         }
 
-        public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message)
-            throws Exception
+        /**
+         * Hands off this event for asynchronous execution.
+         *
+         * @param nextFilter The next filter in the chain.
+         * @param session    The Mina session.
+         * @param message    The message received.
+         */
+        public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
         {
 
             fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
         }
 
-        public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
+        /**
+         * Hands off this event for asynchronous execution.
+         *
+         * @param nextFilter The next filter in the chain.
+         * @param session    The Mina session.
+         */
+        public void sessionClosed(final NextFilter nextFilter, final IoSession session)
         {
             fireAsynchEvent(session, new CloseEvent(nextFilter));
         }
-
     }
 
+    /**
+     * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events
+     * asynchronously.
+     */
     public static class AsynchWritePoolingFilter extends PoolingFilter
     {
-
+        /**
+         * Creates a pooling filter that handles write events asynchronously.
+         *
+         * @param refCountingPool A managed reference to the thread pool.
+         * @param name            The filter types identifying name.
+         */
         public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
             super(refCountingPool, name);
         }
 
+        /**
+         * Hands off this event for asynchronous execution.
+         *
+         * @param nextFilter   The next filter in the chain.
+         * @param session      The Mina session.
+         * @param writeRequest The write request event.
+         */
         public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
-            throws Exception
         {
             fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
         }
 
-        public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
+        /**
+         * Hands off this event for asynchronous execution.
+         *
+         * @param nextFilter The next filter in the chain.
+         * @param session    The Mina session.
+         */
+        public void sessionClosed(final NextFilter nextFilter, final IoSession session)
         {
             fireAsynchEvent(session, new CloseEvent(nextFilter));
         }
-
-    }
-
-    public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
-    {
-        return new AsynchReadPoolingFilter(refCountingPool, name);
     }
-
-    public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
-    {
-        return new AsynchWritePoolingFilter(refCountingPool, name);
-    }
-
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,14 +24,34 @@
 import org.apache.mina.common.ThreadModel;
 import org.apache.mina.filter.ReferenceCountingIoFilter;
 
+/**
+ * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to
+ * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle
+ * these events. The effect of this is that reading and writing may happen concurrently.
+ *
+ * <p/>Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create a filter chain with seperate read and write thread pools for read/write Mina events.
+ *     <td> {@link PoolingFilter}
+ * </table>
+ */
 public class ReadWriteThreadModel implements ThreadModel
 {
-
+    /** Holds the singleton instance of this factory. */
     private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
 
+    /** Holds the thread pooling filter for reads. */
     private final PoolingFilter _asynchronousReadFilter;
+
+    /** Holds the thread pooloing filter for writes. */
     private final PoolingFilter _asynchronousWriteFilter;
 
+    /**
+     * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that
+     * only a singleton instance of the factory is ever created.
+     */
     private ReadWriteThreadModel()
     {
         final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
@@ -39,25 +59,44 @@
         _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
     }
 
+    /**
+     * Gets the singleton instance of this filter chain factory.
+     *
+     * @return The singleton instance of this filter chain factory.
+     */
+    public static ReadWriteThreadModel getInstance()
+    {
+        return _instance;
+    }
+
+    /**
+     * Gets the read filter.
+     *
+     * @return The read filter.
+     */
     public PoolingFilter getAsynchronousReadFilter()
     {
         return _asynchronousReadFilter;
     }
 
+    /**
+     * Gets the write filter.
+     *
+     * @return The write filter.
+     */
     public PoolingFilter getAsynchronousWriteFilter()
     {
         return _asynchronousWriteFilter;
     }
 
-    public void buildFilterChain(IoFilterChain chain) throws Exception
+    /**
+     * Adds the concurrent read and write filters to a filter chain.
+     *
+     * @param chain The Mina filter chain to add to.
+     */
+    public void buildFilterChain(IoFilterChain chain)
     {
-
         chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
         chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
-    }
-
-    public static ReadWriteThreadModel getInstance()
-    {
-        return _instance;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,44 +24,87 @@
 import java.util.concurrent.Executors;
 
 /**
- * We share the executor service among several PoolingFilters. This class reference counts
- * how many filter chains are using the executor service and destroys the service, thus
- * freeing up its threads, when the count reaches zero. It recreates the service when
- * the count is incremented.
+ * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
+ * the references taken, instantiating the service on the first reference, and shutting it down when the last
+ * reference is released.
+ *
+ * <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM
+ * from terminating due to the existence of non-daemon threads.
+ *
+ * <p/><table id="crc><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a shared exector service. <td> {@link Executors}
+ * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
+ * <tr><td> Track references to the executor service.
+ * <tr><td> Provide configuration of the executor service.
+ * </table>
+ *
+ * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the
+ *       implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be
+ *       generalized to reference count anything. That is, on first instance call a create method, on release of last
+ *       instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a
+ *       reference counting factory. It could then be re-used to do reference counting in other places (such as
+ *       messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the
+ *       ref counting factory can call to manage the lifecycle.
  *
- * This is particularly important on the client where failing to destroy the executor
- * service prevents the JVM from shutting down due to the existence of non-daemon threads.
+ * @todo {@link #_poolSize} should be static?
  *
+ * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
+ *       further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility
+ *       for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
+ *       here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
+ *       isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
  */
 public class ReferenceCountingExecutorService
 {
+    /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
     private static final int MINIMUM_POOL_SIZE = 4;
+
+    /** Holds the number of processors on the machine. */
     private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
+
+    /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */
     private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);
 
     /**
-     * We need to be able to check the current reference count and if necessary
-     * create the executor service atomically.
+     * Holds the singleton instance of this reference counter. This is only created once, statically, so the
+     * {@link #getInstance()} method does not need to be synchronized.
      */
     private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();
 
+    /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */
     private final Object _lock = new Object();
 
+    /** The shared executor service that is reference counted. */
     private ExecutorService _pool;
 
+    /** Holds the number of references given out to the executor service. */
     private int _refCount = 0;
 
+    /** Holds the number of executor threads to create. */
     private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
 
+    /**
+     * Retrieves the singleton instance of this reference counter.
+     *
+     * @return The singleton instance of this reference counter.
+     */
     public static ReferenceCountingExecutorService getInstance()
     {
         return _instance;
     }
 
+    /**
+     * Private constructor to ensure that only a singleton instance can be created.
+     */
     private ReferenceCountingExecutorService()
-    {
-    }
+    { }
 
+    /**
+     * Provides a reference to a shared executor service, incrementing the reference count.
+     *
+     * @return An executor service.
+     */
     ExecutorService acquireExecutorService()
     {
         synchronized (_lock)
@@ -70,10 +113,15 @@
             {
                 _pool = Executors.newFixedThreadPool(_poolSize);
             }
+
             return _pool;
         }
     }
 
+    /**
+     * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
+     * to zero, the executor service is shut down.
+     */
     void releaseExecutorService()
     {
         synchronized (_lock)
@@ -86,10 +134,9 @@
     }
 
     /**
-     * The filters that use the executor service should call this method to get access
-     * to the service. Note that this method does not alter the reference count.
+     * Provides access to the executor service, without touching the reference count.
      *
-     * @return the underlying executor service
+     * @return The shared executor service, or <tt>null</tt> if none has been instantiated yet.
      */
     public ExecutorService getPool()
     {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java Thu May 17 06:27:40 2007
@@ -36,7 +36,7 @@
  * that they take a default value when not set. Options may be mandatory in wich case it is an error not to specify
  * them on the command line. Flags are never mandatory because they are implicitly set to false when not specified.
  *
- * <p/>Some examples command line are:
+ * <p/>Some example command lines are:
  *
  * <ul>
  * <li>This one has two options that expect arguments: