You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/05/17 15:27:42 UTC
svn commit: r538914 - in /incubator/qpid/trunk/qpid: ./
java/common/src/main/java/org/apache/qpid/common/
java/common/src/main/java/org/apache/qpid/configuration/
java/common/src/main/java/org/apache/qpid/exchange/
java/common/src/main/java/org/apache/...
Author: rupertlssmith
Date: Thu May 17 06:27:40 2007
New Revision: 538914
URL: http://svn.apache.org/viewvc?view=rev&rev=538914
Log:
Merged revisions 538907 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r538907 | rupertlssmith | 2007-05-17 14:17:59 +0100 (Thu, 17 May 2007) | 1 line
Added to the Javadoc
........
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java Thu May 17 06:27:40 2007
@@ -14,27 +14,46 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.common;
import org.apache.qpid.framing.AMQShortString;
+/**
+ * Specifies the different filter types for consumers that filter their messages.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent different consumer filter types.
+ * </table>
+ */
public enum AMQPFilterTypes
{
JMS_SELECTOR("x-filter-jms-selector"),
NO_CONSUME("x-filter-no-consume"),
AUTO_CLOSE("x-filter-auto-close");
+ /** The identifying string for the filter type. */
private final AMQShortString _value;
+ /**
+ * Creates a new filter type from its identifying string.
+ *
+ * @param value The identifying string.
+ */
AMQPFilterTypes(String value)
{
_value = new AMQShortString(value);
}
+ /**
+ * Gets the identifying string of the filter type.
+ *
+ * @return The identifying string of the filter type.
+ */
public AMQShortString getValue()
{
return _value;
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java Thu May 17 06:27:40 2007
@@ -14,12 +14,20 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.common;
+/**
+ * Specifies the available client property types that different clients can use to identify themselves with.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Specify the available client property types.
+ * </table>
+ */
public enum ClientProperties
{
instance,
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java Thu May 17 06:27:40 2007
@@ -14,9 +14,9 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.common;
@@ -27,23 +27,56 @@
import org.apache.log4j.Logger;
+/**
+ * QpidProperties captures the project name, version number, and source code repository revision number from a properties
+ * file which is generated as part of the build process. Normally, the name and version number are pulled from the module
+ * name and version number of the Maven build POM, but could come from other sources if the build system is changed. The
+ * idea behind this, is that every build has these values incorporated directly into its jar file, so that code in the
+ * wild can be identified, should its origination be forgotten.
+ *
+ * <p/>To get the build version of any Qpid code call the {@link #main} method. This version string is usually also
+ * printed to the console on broker start up.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td>Load build versioning information into the runtime, for code identification purposes.
+ * </table>
+ *
+ * @todo Code to locate/load/log properties can be factored into a reusable properties utils class. Avoid having this
+ * same snippet of loading code scattered in many places.
+ *
+ * @todo Could also add a build number property for a sequential build number assigned by an automated build system, for
+ * build reproducability purposes.
+ */
public class QpidProperties
{
+ /** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(QpidProperties.class);
-
+
+ /** The name of the version properties file to load from the class path. */
public static final String VERSION_RESOURCE = "qpidversion.properties";
+ /** Defines the name of the product property. */
public static final String PRODUCT_NAME_PROPERTY = "qpid.name";
+
+ /** Defines the name of the version property. */
public static final String RELEASE_VERSION_PROPERTY = "qpid.version";
+
+ /** Defines the name of the source code revision property. */
public static final String BUILD_VERSION_PROPERTY = "qpid.svnversion";
+ /** Defines the default value for all properties that cannot be loaded. */
private static final String DEFAULT = "unknown";
+ /** Holds the product name. */
private static String productName = DEFAULT;
+
+ /** Holds the product version. */
private static String releaseVersion = DEFAULT;
+
+ /** Holds the source code revision. */
private static String buildVersion = DEFAULT;
- /** Loads the values from the version properties file. */
+ // Loads the values from the version properties file.
static
{
Properties props = new Properties();
@@ -62,16 +95,17 @@
if (_logger.isDebugEnabled())
{
_logger.debug("Dumping QpidProperties");
- for (Map.Entry<Object,Object> entry : props.entrySet())
+ for (Map.Entry<Object, Object> entry : props.entrySet())
{
- _logger.debug("Property: " + entry.getKey() + " Value: "+ entry.getValue());
+ _logger.debug("Property: " + entry.getKey() + " Value: " + entry.getValue());
}
+
_logger.debug("End of property dump");
}
productName = readPropertyValue(props, PRODUCT_NAME_PROPERTY);
releaseVersion = readPropertyValue(props, RELEASE_VERSION_PROPERTY);
- buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY);
+ buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY);
}
}
catch (IOException e)
@@ -81,26 +115,56 @@
}
}
+ /**
+ * Gets the product name.
+ *
+ * @return The product name.
+ */
public static String getProductName()
{
return productName;
}
+ /**
+ * Gets the product version.
+ *
+ * @return The product version.
+ */
public static String getReleaseVersion()
{
return releaseVersion;
}
+ /**
+ * Gets the source code revision.
+ *
+ * @return The source code revision.
+ */
public static String getBuildVersion()
{
return buildVersion;
}
+ /**
+ * Extracts all of the version information as a printable string.
+ *
+ * @return All of the version information as a printable string.
+ */
public static String getVersionString()
{
return getProductName() + " - " + getReleaseVersion() + " build: " + getBuildVersion();
}
+ /**
+ * Helper method to extract a named property from properties.
+ *
+ * @param props The properties.
+ * @param propertyName The named property to extract.
+ *
+ * @return The extracted property or a default value if the properties do not contain the named property.
+ *
+ * @todo A bit pointless.
+ */
private static String readPropertyValue(Properties props, String propertyName)
{
String retVal = (String) props.get(propertyName);
@@ -108,9 +172,16 @@
{
retVal = DEFAULT;
}
+
return retVal;
}
+ /**
+ * Prints the versioning information to the console. This is extremely usefull for identifying Qpid code in the
+ * wild, where the origination of the code has been forgotten.
+ *
+ * @param args Does not require any arguments.
+ */
public static void main(String[] args)
{
System.out.println(getVersionString());
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,7 +26,7 @@
import java.lang.annotation.Target;
/**
- * Marks a field as being "configured" externally.
+ * Marks a field as having a "configured" value injected into it by a configurator.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java Thu May 17 06:27:40 2007
@@ -24,7 +24,16 @@
import org.apache.qpid.protocol.AMQConstant;
/**
- * Indicates an error parsing a property expansion.
+ * Indicates a failure to parse a property expansion. See {@link PropertyUtils} for the code that does property
+ * expansions.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaboration
+ * <tr><td> Represent failure to expand a property name into a value.
+ * </table>
+ *
+ * @todo AMQException is to be reserved for protocol related conditions. This exception does not have a status code, so
+ * don't inherit from AMQException.
*/
public class PropertyException extends AMQException
{
@@ -33,6 +42,7 @@
super(message);
}
+ /*
public PropertyException(String msg, Throwable t)
{
super(msg, t);
@@ -47,4 +57,5 @@
{
super(errorCode, msg);
}
+ */
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,24 +24,35 @@
import java.util.Iterator;
/**
- * Based on code in Apache Ant, this utility class handles property expansion. This
- * is most useful in config files and so on.
+ * PropertyUtils provides helper methods for dealing with Java properties.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Expand system properties into strings with named expansions.
+ * </table>
+ *
+ * @todo Make the lookup method generic by passing in the properties to use for the expansion, rather than hard coding
+ * as system properties. The expansion code has greater potential for re-use that way.
+ *
+ * @todo Some more property related code could be added to this utils class, which might more appropriately reside under
+ * org.apache.qpid.util. For example standardised code to load properties from a resource name, currently found in
+ * QpidProperties and possibly other places could be moved here.
*/
public class PropertyUtils
{
/**
- * Replaces <code>${xxx}</code> style constructions in the given value
- * with the string value of the corresponding data types. Replaces only system
- * properties
+ * Given a string that contains substrings of the form <code>${xxx}</code>, looks up the valuea of 'xxx' as a
+ * system properties and substitutes tham back into the original string, to provide a property value expanded
+ * string.
*
- * @param value The string to be scanned for property references.
- * May be <code>null</code>, in which case this
+ * @param value The string to be scanned for property references. May be <code>null</code>, in which case this
* method returns immediately with no effect.
- * @return the original string with the properties replaced, or
- * <code>null</code> if the original string is <code>null</code>.
- * @throws PropertyException if the string contains an opening
- * <code>${</code> without a closing
- * <code>}</code>
+ *
+ * @return The original string with the properties replaced, or <code>null</code> if the original string is
+ * <code>null</code>.
+ *
+ * @throws PropertyException If the string contains an opening <code>${</code> without a balancing <code>}</code>,
+ * or if the property to expand does not exist as a system property.
*/
public static String replaceProperties(String value) throws PropertyException
{
@@ -69,11 +80,12 @@
if (replacement == null)
{
- throw new PropertyException("Property ${" + propertyName +
- "} has not been set");
+ throw new PropertyException("Property ${" + propertyName + "} has not been set");
}
+
fragment = replacement;
}
+
sb.append(fragment);
}
@@ -81,32 +93,30 @@
}
/**
- * Default parsing method. Parses the supplied value for properties which are specified
- * using ${foo} syntax. $X is left as is, and $$ specifies a single $.
- * @param value the property string to parse
- * @param fragments is populated with the string fragments. A null means "insert a
- * property value here. The number of nulls in the list when populated is equal to the
- * size of the propertyRefs list
- * @param propertyRefs populated with the property names to be added into the final
- * String.
+ * Parses the supplied value for properties which are specified using ${foo} syntax. $X is left as is, and $$
+ * specifies a single $.
+ *
+ * @param value The property string to parse.
+ * @param fragments Is populated with the string fragments. A null means "insert a property value here. The number
+ * of nulls in the list when populated is equal to the size of the propertyRefs list.
+ * @param propertyRefs Populated with the property names to be added into the final string.
*/
- private static void parsePropertyString(String value, ArrayList<String> fragments,
- ArrayList<String> propertyRefs)
- throws PropertyException
+ private static void parsePropertyString(String value, ArrayList<String> fragments, ArrayList<String> propertyRefs)
+ throws PropertyException
{
int prev = 0;
int pos;
- //search for the next instance of $ from the 'prev' position
+ // search for the next instance of $ from the 'prev' position
while ((pos = value.indexOf("$", prev)) >= 0)
{
- //if there was any text before this, add it as a fragment
+ // if there was any text before this, add it as a fragment
if (pos > 0)
{
fragments.add(value.substring(prev, pos));
}
- //if we are at the end of the string, we tack on a $
- //then move past it
+ // if we are at the end of the string, we tack on a $
+ // then move past it
if (pos == (value.length() - 1))
{
fragments.add("$");
@@ -114,8 +124,8 @@
}
else if (value.charAt(pos + 1) != '{')
{
- //peek ahead to see if the next char is a property or not
- //not a property: insert the char as a literal
+ // peek ahead to see if the next char is a property or not
+ // not a property: insert the char as a literal
if (value.charAt(pos + 1) == '$')
{
// two $ map to one $
@@ -135,22 +145,20 @@
int endName = value.indexOf('}', pos);
if (endName < 0)
{
- throw new PropertyException("Syntax error in property: " +
- value);
+ throw new PropertyException("Syntax error in property: " + value);
}
+
String propertyName = value.substring(pos + 2, endName);
fragments.add(null);
propertyRefs.add(propertyName);
prev = endName + 1;
}
}
- //no more $ signs found
- //if there is any tail to the file, append it
+ // no more $ signs found
+ // if there is any tail to the file, append it
if (prev < value.length())
{
fragments.add(value.substring(prev));
}
}
-
-
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,25 +22,44 @@
import org.apache.qpid.framing.AMQShortString;
+/**
+ * Defines the names of the standard AMQP exchanges that every AMQP broker should provide. These exchange names
+ * and type are given in the specification.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Defines the standard AMQP exchange names.
+ * <tr><td> Defines the standard AMQP exchange types.
+ * </table>
+ *
+ * @todo A type safe enum, might be more appropriate for the exchange types.
+ */
public class ExchangeDefaults
{
- public final static AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>");
-
- public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic");
-
- public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic");
+ /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */
+ public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>");
- public final static AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct");
+ /** The pre-defined topic exchange, the broker SHOULD provide this. */
+ public static final AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic");
- public final static AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct");
+ /** Defines the identifying type name of topic exchanges. */
+ public static final AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic");
- public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match");
+ /** The pre-defined direct exchange, the broker MUST provide this. */
+ public static final AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct");
- public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers");
+ /** Defines the identifying type name of direct exchanges. */
+ public static final AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct");
- public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout");
+ /** The pre-defined headers exchange, the specification does not say this needs to be provided. */
+ public static final AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match");
- public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
+ /** Defines the identifying type name of headers exchanges. */
+ public static final AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers");
+ /** The pre-defined fanout exchange, the boker MUST provide this. */
+ public static final AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout");
+ /** Defines the identifying type name of fanout exchanges. */
+ public static final AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,18 +23,51 @@
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoSession;
-
-abstract public class Event
+/**
+ * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain
+ * for later processing. It is an abstract class, with different implementations for continuations of different kinds
+ * of Mina events.
+ *
+ * <p/>These continuations are typically batched by {@link Job} for processing by a worker thread pool.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Process a continuation in the context of a Mina session.
+ * </table>
+ *
+ * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static
+ * to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of
+ * the context of the outer Event class. The inner class construction used here is preventing common code re-use
+ * (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like
+ * a justifiable use of inner classes. Move the inner classes out into their own files.
+ *
+ * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as
+ * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract,
+ * it is really an interface, so could just drop it and use the continuation interface instead.
+ */
+public abstract class Event
{
-
+ /**
+ * Creates a continuation.
+ */
public Event()
- {
- }
-
-
- abstract public void process(IoSession session);
-
+ { }
+ /**
+ * Processes the continuation in the context of a Mina session.
+ *
+ * @param session The Mina session.
+ */
+ public abstract void process(IoSession session);
+
+ /**
+ * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
+ * </table>
+ */
public static final class ReceivedEvent extends Event
{
private final Object _data;
@@ -59,7 +92,15 @@
}
}
-
+ /**
+ * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Pass a Mina filterWrite event to a NextFilter.
+ * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession}
+ * </table>
+ */
public static final class WriteEvent extends Event
{
private final IoFilter.WriteRequest _data;
@@ -72,7 +113,6 @@
_data = data;
}
-
public void process(IoSession session)
{
_nextFilter.filterWrite(session, _data);
@@ -84,8 +124,14 @@
}
}
-
-
+ /**
+ * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Pass a Mina sessionClosed event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
+ * </table>
+ */
public static final class CloseEvent extends Event
{
private final IoFilter.NextFilter _nextFilter;
@@ -96,7 +142,6 @@
_nextFilter = nextFilter;
}
-
public void process(IoSession session)
{
_nextFilter.sessionClosed(session);
@@ -107,5 +152,4 @@
return _nextFilter;
}
}
-
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,48 +26,77 @@
import org.apache.mina.common.IoSession;
/**
- * Holds events for a session that will be processed asynchronously by
- * the thread pool in PoolingFilter.
+ * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
+ * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially
+ * processing all of its aggregated events.
+ *
+ * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when
+ * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events
+ * may be processed in each run of the job, several runs may be required to clear the queue.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Aggregate many coninuations together into a single continuation.
+ * <tr><td> Sequentially process aggregated continuations. <td> {@link Event}
+ * <tr><td> Provide running and completion status of the aggregate continuation.
+ * <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler}
+ * </table>
+ *
+ * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a
+ * continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable
+ * piece of code. There may be other palces than the mina filter chain where continuation batching is used within
+ * qpid, so abstracting this out could provide a usefull building block. This also opens the way to different
+ * kinds of job with a common interface, e.g. parallel or sequential jobs etc.
+ *
+ * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
*/
public class Job implements Runnable
{
+ /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
+
+ /** The Mina session. */
private final IoSession _session;
+
+ /** Holds the queue of events that make up the job. */
private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+
+ /** Holds a status flag, that indicates when the job is actively running. */
private final AtomicBoolean _active = new AtomicBoolean();
- //private final AtomicInteger _refCount = new AtomicInteger();
+
+ /** Holds the completion continuation, called upon completion of a run of the job. */
private final JobCompletionHandler _completionHandler;
+ /**
+ * Creates a new job that aggregates many continuations together.
+ *
+ * @param session The Mina session.
+ * @param completionHandler The per job run, terminal continuation.
+ * @param maxEvents The maximum number of aggregated continuations to process per run of the job.
+ */
Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
{
_session = session;
_completionHandler = completionHandler;
_maxEvents = maxEvents;
}
-//
-// void acquire()
-// {
-// _refCount.incrementAndGet();
-// }
-//
-// void release()
-// {
-// _refCount.decrementAndGet();
-// }
-//
-// boolean isReferenced()
-// {
-// return _refCount.get() > 0;
-// }
+ /**
+ * Enqueus a continuation for sequential processing by this job.
+ *
+ * @param evt The continuation to enqueue.
+ */
void add(Event evt)
{
_eventQueue.add(evt);
}
+ /**
+ * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
+ */
void processAll()
{
- //limit the number of events processed in one run
+ // limit the number of events processed in one run
for (int i = 0; i < _maxEvents; i++)
{
Event e = _eventQueue.poll();
@@ -82,21 +111,37 @@
}
}
- boolean isComplete()
+ /**
+ * Tests if there are no more enqueued continuations to process.
+ *
+ * @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise.
+ */
+ public boolean isComplete()
{
return _eventQueue.peek() == null;
}
- boolean activate()
+ /**
+ * Marks this job as active if it is inactive. This method is thread safe.
+ *
+ * @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise.
+ */
+ public boolean activate()
{
return _active.compareAndSet(false, true);
}
- void deactivate()
+ /**
+ * Marks this job as inactive. This method is thread safe.
+ */
+ public void deactivate()
{
_active.set(false);
}
+ /**
+ * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation.
+ */
public void run()
{
processAll();
@@ -104,7 +149,12 @@
_completionHandler.completed(_session, this);
}
-
+ /**
+ * Another interface for a continuation.
+ *
+ * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
+ * Runnable or a custom Continuation interface.
+ */
static interface JobCompletionHandler
{
public void completed(IoSession session, Job job);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Thu May 17 06:27:40 2007
@@ -31,22 +31,131 @@
import org.apache.qpid.pool.Event.CloseEvent;
-public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
+/**
+ * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
+ * adds no behaviour by default to the filter chain, it is abstract.
+ *
+ * <p/>PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by
+ * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is
+ * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as
+ * a buffer between stages of the pipeline.
+ *
+ * <p/>There are two convenience methods, {@link #createAynschReadPoolingFilter} and
+ * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and
+ * 'filterWrite' events, making it possible to process these event streams seperately.
+ *
+ * <p/>Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the
+ * Mina session they are working with, and store it in the session against their identifying name. This allows different
+ * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their
+ * workloads in different jobs.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Implement default, pass through filter.
+ * <tr><td> Create pooling filters and a specific thread pool. <td> {@link ReferenceCountingExecutorService}
+ * <tr><td> Provide the ability to batch Mina events for asynchronous processing. <td> {@link Job}, {@link Event}
+ * <tr><td> Provide a terminal continuation to keep jobs running till empty.
+ * <td> {@link Job}, {@link Job.JobCompletionHandler}
+ * </table>
+ *
+ * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
+ * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
+ * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
+ * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
+ * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
+ * stages.
+ *
+ * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
+ * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
+ * and trips off another batch of 10 until they are all done. Why not just have a straight forward
+ * consumer/producer queue scenario without the batches of 10?
+ *
+ * @todo The static helper methods are pointless. Could just call new.
+ */
+public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
{
+ /** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
+ /** Holds a mapping from Mina sessions to batched jobs for execution. */
private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
+
+ /** Holds the managed reference to obtain the executor for the batched jobs. */
private final ReferenceCountingExecutorService _poolReference;
+ /** Used to hold a name for identifying differeny pooling filter types. */
private final String _name;
+
+ /** Defines the maximum number of events that will be batched into a single job. */
private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+ /**
+ * Creates a named pooling filter, on the specified shared thread pool.
+ *
+ * @param refCountingPool The thread pool reference.
+ * @param name The identifying name of the filter type.
+ */
public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
_poolReference = refCountingPool;
_name = name;
}
+ /**
+ * Helper method to get an instance of a pooling filter that handles read events asynchronously.
+ *
+ * @param refCountingPool A managed reference to the thread pool.
+ * @param name The filter types identifying name.
+ *
+ * @return A pooling filter for asynchronous read events.
+ */
+ public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ return new AsynchReadPoolingFilter(refCountingPool, name);
+ }
+
+ /**
+ * Helper method to get an instance of a pooling filter that handles write events asynchronously.
+ *
+ * @param refCountingPool A managed reference to the thread pool.
+ * @param name The filter types identifying name.
+ *
+ * @return A pooling filter for asynchronous write events.
+ */
+ public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ return new AsynchWritePoolingFilter(refCountingPool, name);
+ }
+
+ /**
+ * Called by Mina to initialize this filter. Takes a reference to the thread pool.
+ */
+ public void init()
+ {
+ _logger.info("Init called on PoolingFilter " + toString());
+
+ // Called when the filter is initialised in the chain. If the reference count is
+ // zero this acquire will initialise the pool.
+ _poolReference.acquireExecutorService();
+ }
+
+ /**
+ * Called by Mina to clean up this filter. Releases the reference to the thread pool.
+ */
+ public void destroy()
+ {
+ _logger.info("Destroy called on PoolingFilter " + toString());
+
+ // When the reference count gets to zero we release the executor service.
+ _poolReference.releaseExecutorService();
+ }
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param session The Mina session to work in.
+ * @param event The event to hand off asynchronously.
+ */
void fireAsynchEvent(IoSession session, Event event)
{
Job job = getJobForSession(session);
@@ -62,31 +171,50 @@
}
+ /**
+ * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously
+ * handled events.
+ *
+ * @param session The Mina session.
+ */
public void createNewJobForSession(IoSession session)
{
Job job = new Job(session, this, _maxEvents);
session.setAttribute(_name, job);
}
+ /**
+ * Retrieves this filters Job, by this filters name, from the Mina session.
+ *
+ * @param session The Mina session.
+ *
+ * @return The Job for this filter to place asynchronous events into.
+ */
private Job getJobForSession(IoSession session)
{
return (Job) session.getAttribute(_name);
}
- private Job createJobForSession(IoSession session)
+ /*private Job createJobForSession(IoSession session)
{
return addJobForSession(session, new Job(session, this, _maxEvents));
- }
+ }*/
- private Job addJobForSession(IoSession session, Job job)
+ /*private Job addJobForSession(IoSession session, Job job)
{
// atomic so ensures all threads agree on the same job
Job existing = _jobs.putIfAbsent(session, job);
return (existing == null) ? job : existing;
- }
+ }*/
- // Job.JobCompletionHandler
+ /**
+ * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+ * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
+ *
+ * @param session The Mina session to work in.
+ * @param job The job that completed.
+ */
public void completed(IoSession session, Job job)
{
// if (job.isComplete())
@@ -109,129 +237,228 @@
}
}
- // IoFilter methods that are processed by threads on the pool
-
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
{
nextFilter.sessionOpened(session);
}
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
{
nextFilter.sessionClosed(session);
}
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ * @param status The session idle status.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception
{
nextFilter.sessionIdle(session, status);
}
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ * @param cause The underlying exception.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception
{
nextFilter.exceptionCaught(session, cause);
}
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ * @param message The message received.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
{
nextFilter.messageReceived(session, message);
}
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ * @param message The message sent.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
{
nextFilter.messageSent(session, message);
}
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ * @param writeRequest The write request event.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
throws Exception
{
nextFilter.filterWrite(session, writeRequest);
}
- // IoFilter methods that are processed on current thread (NOT on pooled thread)
-
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
public void filterClose(NextFilter nextFilter, IoSession session) throws Exception
{
nextFilter.filterClose(session);
}
- public void sessionCreated(NextFilter nextFilter, IoSession session)
+ /**
+ * No-op pass through filter to the next filter in the chain.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ *
+ * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
+ * overriding sub-classes the ability to.
+ */
+ public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception
{
nextFilter.sessionCreated(session);
}
+ /**
+ * Prints the filter types identifying name to a string, mainly for debugging purposes.
+ *
+ * @return The filter types identifying name.
+ */
public String toString()
{
return _name;
}
- // LifeCycle methods
-
- public void init()
- {
- _logger.info("Init called on PoolingFilter " + toString());
- // called when the filter is initialised in the chain. If the reference count is
- // zero this acquire will initialise the pool
- _poolReference.acquireExecutorService();
- }
-
- public void destroy()
- {
- _logger.info("Destroy called on PoolingFilter " + toString());
- // when the reference count gets to zero we release the executor service
- _poolReference.releaseExecutorService();
- }
-
+ /**
+ * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events
+ * asynchronously.
+ */
public static class AsynchReadPoolingFilter extends PoolingFilter
{
-
+ /**
+ * Creates a pooling filter that handles read events asynchronously.
+ *
+ * @param refCountingPool A managed reference to the thread pool.
+ * @param name The filter types identifying name.
+ */
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
super(refCountingPool, name);
}
- public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message)
- throws Exception
+ /**
+ * Hands off this event for asynchronous execution.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ * @param message The message received.
+ */
+ public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
{
fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
}
- public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
+ /**
+ * Hands off this event for asynchronous execution.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ */
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
fireAsynchEvent(session, new CloseEvent(nextFilter));
}
-
}
+ /**
+ * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events
+ * asynchronously.
+ */
public static class AsynchWritePoolingFilter extends PoolingFilter
{
-
+ /**
+ * Creates a pooling filter that handles write events asynchronously.
+ *
+ * @param refCountingPool A managed reference to the thread pool.
+ * @param name The filter types identifying name.
+ */
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
super(refCountingPool, name);
}
+ /**
+ * Hands off this event for asynchronous execution.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ * @param writeRequest The write request event.
+ */
public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
- throws Exception
{
fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
}
- public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
+ /**
+ * Hands off this event for asynchronous execution.
+ *
+ * @param nextFilter The next filter in the chain.
+ * @param session The Mina session.
+ */
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
fireAsynchEvent(session, new CloseEvent(nextFilter));
}
-
- }
-
- public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- return new AsynchReadPoolingFilter(refCountingPool, name);
}
-
- public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- return new AsynchWritePoolingFilter(refCountingPool, name);
- }
-
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,14 +24,34 @@
import org.apache.mina.common.ThreadModel;
import org.apache.mina.filter.ReferenceCountingIoFilter;
+/**
+ * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to
+ * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle
+ * these events. The effect of this is that reading and writing may happen concurrently.
+ *
+ * <p/>Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create a filter chain with seperate read and write thread pools for read/write Mina events.
+ * <td> {@link PoolingFilter}
+ * </table>
+ */
public class ReadWriteThreadModel implements ThreadModel
{
-
+ /** Holds the singleton instance of this factory. */
private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
+ /** Holds the thread pooling filter for reads. */
private final PoolingFilter _asynchronousReadFilter;
+
+ /** Holds the thread pooloing filter for writes. */
private final PoolingFilter _asynchronousWriteFilter;
+ /**
+ * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that
+ * only a singleton instance of the factory is ever created.
+ */
private ReadWriteThreadModel()
{
final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
@@ -39,25 +59,44 @@
_asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
}
+ /**
+ * Gets the singleton instance of this filter chain factory.
+ *
+ * @return The singleton instance of this filter chain factory.
+ */
+ public static ReadWriteThreadModel getInstance()
+ {
+ return _instance;
+ }
+
+ /**
+ * Gets the read filter.
+ *
+ * @return The read filter.
+ */
public PoolingFilter getAsynchronousReadFilter()
{
return _asynchronousReadFilter;
}
+ /**
+ * Gets the write filter.
+ *
+ * @return The write filter.
+ */
public PoolingFilter getAsynchronousWriteFilter()
{
return _asynchronousWriteFilter;
}
- public void buildFilterChain(IoFilterChain chain) throws Exception
+ /**
+ * Adds the concurrent read and write filters to a filter chain.
+ *
+ * @param chain The Mina filter chain to add to.
+ */
+ public void buildFilterChain(IoFilterChain chain)
{
-
chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
- }
-
- public static ReadWriteThreadModel getInstance()
- {
- return _instance;
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Thu May 17 06:27:40 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,44 +24,87 @@
import java.util.concurrent.Executors;
/**
- * We share the executor service among several PoolingFilters. This class reference counts
- * how many filter chains are using the executor service and destroys the service, thus
- * freeing up its threads, when the count reaches zero. It recreates the service when
- * the count is incremented.
+ * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
+ * the references taken, instantiating the service on the first reference, and shutting it down when the last
+ * reference is released.
+ *
+ * <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM
+ * from terminating due to the existence of non-daemon threads.
+ *
+ * <p/><table id="crc><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a shared exector service. <td> {@link Executors}
+ * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
+ * <tr><td> Track references to the executor service.
+ * <tr><td> Provide configuration of the executor service.
+ * </table>
+ *
+ * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the
+ * implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be
+ * generalized to reference count anything. That is, on first instance call a create method, on release of last
+ * instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a
+ * reference counting factory. It could then be re-used to do reference counting in other places (such as
+ * messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the
+ * ref counting factory can call to manage the lifecycle.
*
- * This is particularly important on the client where failing to destroy the executor
- * service prevents the JVM from shutting down due to the existence of non-daemon threads.
+ * @todo {@link #_poolSize} should be static?
*
+ * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
+ * further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility
+ * for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
+ * here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
+ * isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
*/
public class ReferenceCountingExecutorService
{
+ /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
private static final int MINIMUM_POOL_SIZE = 4;
+
+ /** Holds the number of processors on the machine. */
private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
+
+ /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */
private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);
/**
- * We need to be able to check the current reference count and if necessary
- * create the executor service atomically.
+ * Holds the singleton instance of this reference counter. This is only created once, statically, so the
+ * {@link #getInstance()} method does not need to be synchronized.
*/
private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();
+ /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */
private final Object _lock = new Object();
+ /** The shared executor service that is reference counted. */
private ExecutorService _pool;
+ /** Holds the number of references given out to the executor service. */
private int _refCount = 0;
+ /** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ /**
+ * Retrieves the singleton instance of this reference counter.
+ *
+ * @return The singleton instance of this reference counter.
+ */
public static ReferenceCountingExecutorService getInstance()
{
return _instance;
}
+ /**
+ * Private constructor to ensure that only a singleton instance can be created.
+ */
private ReferenceCountingExecutorService()
- {
- }
+ { }
+ /**
+ * Provides a reference to a shared executor service, incrementing the reference count.
+ *
+ * @return An executor service.
+ */
ExecutorService acquireExecutorService()
{
synchronized (_lock)
@@ -70,10 +113,15 @@
{
_pool = Executors.newFixedThreadPool(_poolSize);
}
+
return _pool;
}
}
+ /**
+ * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
+ * to zero, the executor service is shut down.
+ */
void releaseExecutorService()
{
synchronized (_lock)
@@ -86,10 +134,9 @@
}
/**
- * The filters that use the executor service should call this method to get access
- * to the service. Note that this method does not alter the reference count.
+ * Provides access to the executor service, without touching the reference count.
*
- * @return the underlying executor service
+ * @return The shared executor service, or <tt>null</tt> if none has been instantiated yet.
*/
public ExecutorService getPool()
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java?view=diff&rev=538914&r1=538913&r2=538914
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java Thu May 17 06:27:40 2007
@@ -36,7 +36,7 @@
* that they take a default value when not set. Options may be mandatory in wich case it is an error not to specify
* them on the command line. Flags are never mandatory because they are implicitly set to false when not specified.
*
- * <p/>Some examples command line are:
+ * <p/>Some example command lines are:
*
* <ul>
* <li>This one has two options that expect arguments: