You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/12/08 19:15:41 UTC
svn commit: r724432 [1/4] - in
/webservices/commons/trunk/modules/transport/modules:
base/src/main/java/org/apache/axis2/transport/base/ jms/
jms/src/main/java/org/apache/axis2/transport/jms/
testkit/src/main/java/org/apache/axis2/transport/testkit/tes...
Author: asankha
Date: Mon Dec 8 10:15:40 2008
New Revision: 724432
URL: http://svn.apache.org/viewvc?rev=724432&view=rev
Log:
merging JMS transport enhancements from branch webservices/commons/trunk/scratch/asankha/
supports JTA and JMS local transactions
supports dynamic scaling and many advanced JMS options
Added:
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
webservices/commons/trunk/modules/transport/modules/jms/pom.xml
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
webservices/commons/trunk/modules/transport/modules/tests/log4j.properties
webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java
webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java
webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java
webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java
Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java (original)
+++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java Mon Dec 8 10:15:40 2008
@@ -171,6 +171,14 @@
}
return result;
}
+
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+ return getEPRsForService(serviceName);
+ }
+
+ protected EndpointReference[] getEPRsForService(String serviceName) {
+ return null;
+ }
private boolean ignoreService(AxisService service) {
return service.getName().startsWith("__"); // these are "private" services
Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java (original)
+++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java Mon Dec 8 10:15:40 2008
@@ -32,6 +32,7 @@
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.description.WSDL2Constants;
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.axiom.om.util.UUIDGenerator;
import javax.management.MBeanServer;
@@ -58,6 +59,13 @@
private int state = BaseConstants.STOPPED;
/**
+ * A constructor that makes subclasses pick up the correct logger
+ */
+ protected AbstractTransportSender() {
+ log = LogFactory.getLog(this.getClass());
+ }
+
+ /**
* Initialize the generic transport sender.
*
* @param cfgCtx the axis configuration context
Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java (original)
+++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java Mon Dec 8 10:15:40 2008
@@ -96,4 +96,31 @@
// this is an property required by axis2
// FIXME: where is this required in Axis2?
public final static String MAIL_CONTENT_TYPE = "mail.contenttype";
+
+ /** Service transaction level - non-transactional */
+ public static final int TRANSACTION_NONE = 0;
+ /** Service transaction level - use non-JTA (i.e. local) transactions */
+ public static final int TRANSACTION_LOCAL = 1;
+ /** Service transaction level - use JTA transactions */
+ public static final int TRANSACTION_JTA = 2;
+ /** Service transaction level - non-transactional */
+ public static final String STR_TRANSACTION_NONE = "none";
+ /** Service transaction level - use non-JTA (i.e. local) transactions */
+ public static final String STR_TRANSACTION_LOCAL = "local";
+ /** Service transaction level - use JTA transactions */
+ public static final String STR_TRANSACTION_JTA = "jta";
+
+ /** The Parameter name indicating the transactionality of a service */
+ public static final String PARAM_TRANSACTIONALITY = "transport.Transactionality";
+ /** Parameter name indicating the JNDI name to get a UserTransaction from JNDI */
+ public static final String PARAM_USER_TXN_JNDI_NAME = "transport.UserTxnJNDIName";
+ /** Parameter that indicates if a UserTransaction reference could be cached - default yes */
+ public static final String PARAM_CACHE_USER_TXN = "transport.CacheUserTxn";
+
+ /** The UserTransaction associated with this message */
+ public static final String USER_TRANSACTION = "UserTransaction";
+ /** A message level property indicating a request to rollback the transaction associated with the message */
+ public static final String SET_ROLLBACK_ONLY = "SET_ROLLBACK_ONLY";
+ /** A message level property indicating a commit is required after the next immidiate send over a transport */
+ public static final String JTA_COMMIT_AFTER_SEND = "JTA_COMMIT_AFTER_SEND";
}
Modified: webservices/commons/trunk/modules/transport/modules/jms/pom.xml
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/pom.xml?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/pom.xml (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/pom.xml Mon Dec 8 10:15:40 2008
@@ -76,6 +76,12 @@
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>${jms-1.1-spec.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+ <version>${jta-spec.version}</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
@@ -95,6 +101,7 @@
<commons.logging.version>1.1</commons.logging.version>
<axis2-transport-base.version>SNAPSHOT</axis2-transport-base.version>
<jms-1.1-spec.version>1.1</jms-1.1-spec.version>
+ <jta-spec.version>1.0</jta-spec.version>
</properties>
</project>
\ No newline at end of file
Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Mon Dec 8 10:15:40 2008
@@ -21,6 +21,10 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.transport.base.BaseUtils;
import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
+import org.apache.axis2.AxisFault;
+import org.apache.axiom.om.OMElement;
import javax.jms.*;
import javax.naming.Context;
@@ -33,503 +37,358 @@
/**
* Encapsulate a JMS Connection factory definition within an Axis2.xml
- * <p/>
- * More than one JMS connection factory could be defined within an Axis2 XML
- * specifying the JMSListener as the transportReceiver.
- * <p/>
- * These connection factories are created at the initialization of the
- * transportReceiver, and any service interested in using any of these could
- * specify the name of the factory and the destination through Parameters named
- * JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below.
- * <p/>
- * <parameter name="transport.jms.ConnectionFactory" locked="true">myQueueConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="true">TestQueue</parameter>
- * <p/>
- * If a connection factory is defined by a parameter named
- * JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not
- * explicitly specify a connection factory will be defaulted to it - if it is
- * defined in the Axis2 configuration.
- * <p/>
- * e.g.
- * <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
- * <parameter name="myTopicConnectionFactory" locked="false">
- * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
- * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
- * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="false">myTopicOne, myTopicTwo</parameter>
- * </parameter>
- * <parameter name="myQueueConnectionFactory" locked="false">
- * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
- * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
- * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="false">myQueueOne, myQueueTwo</parameter>
- * </parameter>
- * <parameter name="default" locked="false">
- * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
- * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
- * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="false">myDestinationOne, myDestinationTwo</parameter>
- * </parameter>
- * </transportReceiver>
+ *
+ * JMS Connection Factory definitions, allows JNDI properties as well as other service
+ * level parameters to be defined, and re-used by each service that binds to it
+ *
+ * When used for sending messages out, the JMSConnectionFactory'ies are able to cache
+ * a Connection, Session or Producer
*/
-public class JMSConnectionFactory implements ExceptionListener {
+public class JMSConnectionFactory {
private static final Log log = LogFactory.getLog(JMSConnectionFactory.class);
/** The name used for the connection factory definition within Axis2 */
private String name = null;
- /** The JMS transport listener instance. */
- private final JMSListener jmsListener;
- /** The worker pool to use. */
- private final WorkerPool workerPool;
- /** The JNDI name of the actual connection factory */
- private String connFactoryJNDIName = null;
- /** Map of destination JNDI names to endpoints */
- private Map<String,JMSEndpoint> endpointJNDINameMapping = null;
- /** JMS Sessions currently active. One session for each Destination / Service */
- private Map<String,Session> jmsSessions = null;
- /** Properties of the connection factory to acquire the initial context */
- private Hashtable<String,String> jndiProperties = null;
- /** The JNDI Context used - created using the properties */
+ /** The list of parameters from the axis2.xml definition */
+ private Hashtable<String, String> parameters = new Hashtable<String, String>();
+
+ /** The cached InitialContext reference */
private Context context = null;
- /** The actual ConnectionFactory instance held within */
+ /** The JMS ConnectionFactory this definition refers to */
private ConnectionFactory conFactory = null;
- /** The JMS connection factory type */
- private String connectionFactoryType = null;
- /** The JMS Connection opened */
- private Connection connection = null;
- /** The axis2 configuration context */
- private ConfigurationContext cfgCtx = null;
- /** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */
- private long reconnectTimeout = 30000;
-
- /**
- * Create a JMSConnectionFactory for the given [axis2] name the
- * JNDI name of the actual ConnectionFactory
- *
- * @param name the connection factory name specified in the axis2.xml for the
- * TransportListener or the TransportSender using this
- * @param jmsListener the JMS transport listener, or null if the connection factory
- * is not linked to a transport listener
- * @param workerPool the worker pool to be used to process incoming messages; may be null
- * @param cfgCtx the axis2 configuration context
- */
- public JMSConnectionFactory(String name, JMSListener jmsListener, WorkerPool workerPool,
- ConfigurationContext cfgCtx) {
- this.name = name;
- this.jmsListener = jmsListener;
- this.workerPool = workerPool;
- this.cfgCtx = cfgCtx;
- endpointJNDINameMapping = new HashMap<String,JMSEndpoint>();
- jndiProperties = new Hashtable<String,String>();
- jmsSessions = new HashMap<String,Session>();
- }
-
-
- /**
- * Add a listen destination on this connection factory on behalf of the given service
- *
- * @param endpoint the {@link JMSEndpoint} object that specifies the destination and
- * the service
- */
- public void addDestination(JMSEndpoint endpoint) {
- String destinationJNDIName = endpoint.getJndiDestinationName();
- String destinationName = getPhysicalDestinationName(destinationJNDIName);
-
- if (destinationName == null) {
- log.warn("JMS Destination with JNDI name : " + destinationJNDIName + " does not exist");
-
- try {
- log.info("Creating a JMS Queue with the JNDI name : " + destinationJNDIName +
- " using the connection factory definition named : " + name);
- JMSUtils.createDestination(conFactory, destinationJNDIName, endpoint.getDestinationType());
-
- destinationName = getPhysicalDestinationName(destinationJNDIName);
-
- } catch (JMSException e) {
- log.error("Unable to create Destination with JNDI name : " + destinationJNDIName, e);
- BaseUtils.markServiceAsFaulty(
- endpoint.getServiceName(),
- "Error creating JMS destination : " + destinationJNDIName,
- cfgCtx.getAxisConfiguration());
- return;
- }
- }
-
- endpointJNDINameMapping.put(destinationJNDIName, endpoint);
-
- log.info("Mapped JNDI name : " + destinationJNDIName + " and JMS Destination name : " +
- destinationName + " against service : " + endpoint.getServiceName());
- }
+ /** The shared JMS Connection for this JMS connection factory */
+ private Connection sharedConnection = null;
+ /** The shared JMS Session for this JMS connection factory */
+ private Session sharedSession = null;
+ /** The shared JMS MessageProducer for this JMS connection factory */
+ private MessageProducer sharedProducer = null;
+ /** The Shared Destination */
+ private Destination sharedDestination = null;
+ /** The shared JMS connection for this JMS connection factory */
+ private int cacheLevel = JMSConstants.CACHE_CONNECTION;
/**
- * Abort listening on the JMS destination from this connection factory
- *
- * @param jndiDestinationName the JNDI name of the JMS destination to be removed
+ * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct
+ * @param parameter the axis2.xml 'Parameter' that defined the JMS CF
*/
- public void removeDestination(String jndiDestinationName) {
- stoplisteningOnDestination(jndiDestinationName);
- endpointJNDINameMapping.remove(jndiDestinationName);
- }
+ public JMSConnectionFactory(Parameter parameter) {
- /**
- * Begin [or restart] listening for messages on the list of destinations associated
- * with this connection factory. (Called during Axis2 initialization of
- * the Transport receivers, or after a disconnection has been detected)
- *
- * When called from the JMS transport sender, this call simply acquires the actual
- * JMS connection factory from the JNDI, creates a new connection and starts it.
- *
- * @throws JMSException on exceptions
- * @throws NamingException on exceptions
- */
- public synchronized void connectAndListen() throws JMSException, NamingException {
+ this.name = parameter.getName();
+ ParameterIncludeImpl pi = new ParameterIncludeImpl();
- // if this is a reconnection/re-initialization effort after the detection of a
- // disconnection, close all sessions and the CF connection and re-initialize
- if (connection != null) {
- log.info("Re-initializing the JMS connection factory : " + name);
-
- for (Session session : jmsSessions.values()) {
- try {
- session.close();
- } catch (JMSException ignore) {}
- }
- try {
- connection.stop();
- } catch (JMSException ignore) {}
-
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Initializing the JMS connection factory : " + name);
- }
+ try {
+ pi.deserializeParameters((OMElement) parameter.getValue());
+ } catch (AxisFault axisFault) {
+ handleException("Error reading parameters for JMS connection factory" + name, axisFault);
}
- // get the CF reference freshly [again] from JNDI
- context = new InitialContext(jndiProperties);
- conFactory = JMSUtils.lookup(context, ConnectionFactory.class, connFactoryJNDIName);
- log.info("Connected to the JMS connection factory : " + connFactoryJNDIName);
+ for (Object o : pi.getParameters()) {
+ Parameter p = (Parameter) o;
+ parameters.put(p.getName(), (String) p.getValue());
+ }
+ digestCacheLevel();
try {
- connection = JMSUtils.createConnection(conFactory,
- jndiProperties.get(Context.SECURITY_PRINCIPAL),
- jndiProperties.get(Context.SECURITY_CREDENTIALS),
- getConnectionFactoryType());
-
- connection.setExceptionListener(this);
-
- } catch (JMSException e) {
- handleException("Error connecting to Connection Factory : " + connFactoryJNDIName, e);
- }
+ context = new InitialContext(parameters);
+ conFactory = JMSUtils.lookup(context, ConnectionFactory.class,
+ parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME));
+ if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) {
+ sharedDestination = JMSUtils.lookup(context, Destination.class,
+ parameters.get(JMSConstants.PARAM_DESTINATION));
+ }
+ log.info("JMS ConnectionFactory : " + name + " initialized");
- for (JMSEndpoint endpoint : endpointJNDINameMapping.values()) {
- startListeningOnDestination(endpoint);
+ } catch (NamingException e) {
+ throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " +
+ parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " +
+ parameters.get(JMSConstants.PARAM_DESTINATION) +
+ " for JMS CF : " + name + " using : " + parameters);
}
-
- connection.start(); // indicate readiness to start receiving messages
- log.info("Connection factory : " + name + " initialized...");
}
/**
- * Create a session for sending to the given destination and save it on the jmsSessions Map
- * keyed by the destination JNDI name
- * @param destinationJNDIname the destination JNDI name
- * @return a JMS Session to send messages to the destination using this connection factory
+ * Digest, the cache value iff specified
*/
- public Session getSessionForDestination(String destinationJNDIname) {
-
- Session session = jmsSessions.get(destinationJNDIname);
+ private void digestCacheLevel() {
- if (session == null) {
- try {
- Destination dest = getPhysicalDestination(destinationJNDIname);
+ String key = JMSConstants.PARAM_CACHE_LEVEL;
+ String val = parameters.get(key);
- if (dest instanceof Topic) {
- session = ((TopicConnection) connection).
- createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- } else {
- session = ((QueueConnection) connection).
- createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- jmsSessions.put(destinationJNDIname, session);
-
- } catch (JMSException e) {
- handleException("Unable to create a session using connection factory : " + name, e);
- }
+ if ("none".equalsIgnoreCase(val)) {
+ this.cacheLevel = JMSConstants.CACHE_NONE;
+ } else if ("connection".equalsIgnoreCase(val)) {
+ this.cacheLevel = JMSConstants.CACHE_CONNECTION;
+ } else if ("session".equals(val)){
+ this.cacheLevel = JMSConstants.CACHE_SESSION;
+ } else if ("producer".equals(val)) {
+ this.cacheLevel = JMSConstants.CACHE_PRODUCER;
+ } else if (val != null) {
+ throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name);
}
- return session;
}
/**
- * Listen on the given destination from this connection factory. Used to
- * start listening on a destination associated with a newly deployed service
- *
- * @param endpoint the JMS destination to listen on
- */
- public void startListeningOnDestination(JMSEndpoint endpoint) {
- String destinationJNDIname = endpoint.getJndiDestinationName();
- String destinationType = endpoint.getDestinationType();
- Session session = jmsSessions.get(destinationJNDIname);
- // if we already had a session open, close it first
- if (session != null) {
- try {
- session.close();
- } catch (JMSException ignore) {}
- }
-
- try {
- session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE, destinationType);
- Destination destination = null;
-
- try {
- destination = JMSUtils.lookup(context, Destination.class, destinationJNDIname);
-
- } catch (NameNotFoundException e) {
- log.warn("Cannot find destination : " + destinationJNDIname + ". Creating a Queue");
- destination = JMSUtils.createDestination(session, destinationJNDIname, destinationType);
- }
-
- MessageConsumer consumer = JMSUtils.createConsumer(session, destination);
- consumer.setMessageListener(new JMSMessageReceiver(jmsListener, this, workerPool,
- cfgCtx, endpoint));
- jmsSessions.put(destinationJNDIname, session);
-
- // catches NameNotFound and JMSExceptions and marks service as faulty
- } catch (Exception e) {
- if (session != null) {
- try {
- session.close();
- } catch (JMSException ignore) {}
- }
-
- BaseUtils.markServiceAsFaulty(
- endpoint.getServiceName(),
- "Error looking up JMS destination : " + destinationJNDIname,
- cfgCtx.getAxisConfiguration());
- }
+ * Return the name assigned to this JMS CF definition
+ * @return name of the JMS CF
+ */
+ public String getName() {
+ return name;
}
/**
- * Stop listening on the given destination - for undeployment or stopping of services
- * closes the underlying Session opened to subscribe to the destination
- *
- * @param destinationJNDIname the JNDI name of the JMS destination
+ * The list of properties (including JNDI and non-JNDI)
+ * @return properties defined on the JMS CF
*/
- private void stoplisteningOnDestination(String destinationJNDIname) {
- Session session = jmsSessions.get(destinationJNDIname);
- if (session != null) {
- try {
- session.close();
- } catch (JMSException ignore) {}
- }
+ public Hashtable<String, String> getParameters() {
+ return parameters;
}
-
/**
- * Close all connections, sessions etc.. and stop this connection factory
+ * Get cached InitialContext
+ * @return cache InitialContext
*/
- public void stop() {
- if (connection != null) {
- for (Session session : jmsSessions.values()) {
- try {
- session.close();
- } catch (JMSException ignore) {}
- }
- try {
- connection.close();
- } catch (JMSException e) {
- log.warn("Error shutting down connection factory : " + name, e);
- }
- }
+ public Context getContext() {
+ return context;
}
/**
- * Return the provider specific [physical] Destination name if any
- * for the destination with the given JNDI name
- *
- * @param destinationJndi the JNDI name of the destination
- * @return the provider specific Destination name or null if cannot be found
- */
- private String getPhysicalDestinationName(String destinationJndi) {
- Destination destination = getPhysicalDestination(destinationJndi);
-
- if (destination != null) {
- try {
- if (destination instanceof Queue) {
- return ((Queue) destination).getQueueName();
- } else if (destination instanceof Topic) {
- return ((Topic) destination).getTopicName();
- }
- } catch (JMSException e) {
- log.warn("Error reading Destination name for JNDI destination : " + destinationJndi, e);
- }
- }
- return null;
+ * Cache level applicable for this JMS CF
+ * @return applicable cache level
+ */
+ public int getCacheLevel() {
+ return cacheLevel;
}
-
+
/**
- * Return the provider specific [physical] Destination if any
- * for the destination with the given JNDI name
- *
- * @param destinationJndi the JNDI name of the destination
- * @return the provider specific Destination or null if cannot be found
+ * Get the shared Destination - if defined
+ * @return
*/
- private Destination getPhysicalDestination(String destinationJndi) {
- Destination destination = null;
+ public Destination getSharedDestination() {
+ return sharedDestination;
+ }
+ /**
+ * Lookup a Destination using this JMS CF definitions and JNDI name
+ * @param name JNDI name of the Destionation
+ * @return JMS Destination for the given JNDI name or null
+ */
+ public Destination getDestination(String name) {
try {
- destination = JMSUtils.lookup(context, Destination.class, destinationJndi);
+ return JMSUtils.lookup(context, Destination.class, name);
} catch (NamingException e) {
-
- // if we are using ActiveMQ, check for dynamic Queues and Topics
- String provider = jndiProperties.get(Context.INITIAL_CONTEXT_FACTORY);
- if (provider.indexOf("activemq") != -1) {
- try {
- destination = JMSUtils.lookup(context, Destination.class,
- JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE + destinationJndi);
- } catch (NamingException ne) {
- try {
- destination = JMSUtils.lookup(context, Destination.class,
- JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC + destinationJndi);
- } catch (NamingException e1) {
- log.warn("Error looking up destination for JNDI name : " + destinationJndi);
- }
- }
- }
+ handleException("Unknown JMS Destination : " + name + " using : " + parameters, e);
}
-
- return destination;
+ return null;
}
/**
- * Return the EPR for the JMS Destination with the given JNDI name
- * when using this connection factory
- * @param jndiDestination the JNDI name of the JMS destination
- * @return the EPR for a service using this destination
+ * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter
+ * @return reply destination defined in the JMS CF
*/
- public EndpointReference getEPRForDestination(String jndiDestination) {
-
- StringBuffer sb = new StringBuffer();
- sb.append(JMSConstants.JMS_PREFIX).append(jndiDestination);
- sb.append("?").
- append(JMSConstants.CONFAC_JNDI_NAME_PARAM).
- append("=").append(getConnFactoryJNDIName());
- for (Map.Entry<String,String> entry : getJndiProperties().entrySet()) {
- sb.append("&").append(entry.getKey()).append("=").append(entry.getValue());
- }
-
- return new EndpointReference(sb.toString());
+ public String getReplyToDestination() {
+ return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION);
}
- // -------------------- getters and setters and trivial methods --------------------
-
- public void setConnFactoryJNDIName(String connFactoryJNDIName) {
- this.connFactoryJNDIName = connFactoryJNDIName;
+ private void handleException(String msg, Exception e) {
+ log.error(msg, e);
+ throw new AxisJMSException(msg, e);
}
- public Destination getDestination(String destinationJNDIName) {
- try {
- return JMSUtils.lookup(context, Destination.class, destinationJNDIName);
- } catch (NamingException ignore) {}
- return null;
+ /**
+ * Should the JMS 1.1 API be used? - defaults to yes
+ * @return true, if JMS 1.1 api should be used
+ */
+ public boolean isJmsSpec11() {
+ return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null ||
+ "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER));
}
- public void addJNDIContextProperty(String key, String value) {
- jndiProperties.put(key, value);
- }
+ /**
+ * Return the type of the JMS CF Destination
+ * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination
+ */
+ public Boolean isQueue() {
+ if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null &&
+ parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) {
+ return null;
+ }
- public String getName() {
- return name;
+ if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) {
+ if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
+ return true;
+ } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
+ return false;
+ } else {
+ throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " +
+ parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name);
+ }
+ } else {
+ if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
+ return true;
+ } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
+ return false;
+ } else {
+ throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " +
+ parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name);
+ }
+ }
}
- public String getConnFactoryJNDIName() {
- return connFactoryJNDIName;
+ /**
+ * Is a session transaction requested from users of this JMS CF?
+ * @return session transaction required by the clients of this?
+ */
+ private boolean isSessionTransacted() {
+ return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null ||
+ Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED));
}
- public ConnectionFactory getConFactory() {
- return conFactory;
- }
+ /**
+ * Create a new Connection
+ * @return a new Connection
+ */
+ private Connection createConnection() {
- public Hashtable<String,String> getJndiProperties() {
- return jndiProperties;
- }
+ Connection connection = null;
+ try {
+ connection = JMSUtils.createConnection(
+ conFactory,
+ parameters.get(JMSConstants.PARAM_JMS_USERNAME),
+ parameters.get(JMSConstants.PARAM_JMS_PASSWORD),
+ isJmsSpec11(), isQueue());
- public Context getContext() {
- return context;
- }
+ if (log.isDebugEnabled()) {
+ log.debug("New JMS Connection from JMS CF : " + name + " created");
+ }
- private void handleException(String msg, Exception e) throws AxisJMSException {
- log.error(msg, e);
- throw new AxisJMSException(msg, e);
+ } catch (JMSException e) {
+ handleException("Error acquiring a Connection from the JMS CF : " + name +
+ " using properties : " + parameters, e);
+ }
+ return connection;
}
- public String getConnectionFactoryType() {
- return connectionFactoryType;
- }
+ /**
+ * Create a new Session
+ * @param connection Connection to use
+ * @return A new Session
+ */
+ private Session createSession(Connection connection) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS Session from JMS CF : " + name);
+ }
+ return JMSUtils.createSession(
+ connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue());
- public void setConnectionFactoryType(String connectionFactoryType) {
- this.connectionFactoryType = connectionFactoryType;
- }
-
- public long getReconnectTimeout() {
- return reconnectTimeout;
+ } catch (JMSException e) {
+ handleException("Error creating JMS session from JMS CF : " + name, e);
+ }
+ return null;
}
- public void setReconnectTimeout(long reconnectTimeout) {
- this.reconnectTimeout = reconnectTimeout;
- }
+ /**
+ * Create a new MessageProducer
+ * @param session Session to be used
+ * @param destination Destination to be used
+ * @return a new MessageProducer
+ */
+ private MessageProducer createProducer(Session session, Destination destination) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS MessageProducer from JMS CF : " + name);
+ }
- public void onException(JMSException e) {
- log.error("JMS connection factory " + name + " encountered an error", e);
- boolean wasError = true;
+ return JMSUtils.createProducer(
+ session, destination, isQueue(), isJmsSpec11());
- if (jmsListener != null) {
- jmsListener.error(null, e);
+ } catch (JMSException e) {
+ handleException("Error creating JMS producer from JMS CF : " + name,e);
}
+ return null;
+ }
- // try to connect
- // if error occurs wait and try again
- while (wasError == true) {
+ /**
+ * Get a new Connection or shared Connection from this JMS CF
+ * @return new or shared Connection from this JMS CF
+ */
+ public Connection getConnection() {
+ if (cacheLevel > JMSConstants.CACHE_NONE) {
+ return getSharedConnection();
+ } else {
+ return createConnection();
+ }
+ }
- try {
- connectAndListen();
- wasError = false;
+ /**
+ * Get a new Session or shared Session from this JMS CF
+ * @param connection the Connection to be used
+ * @return new or shared Session from this JMS CF
+ */
+ public Session getSession(Connection connection) {
+ if (cacheLevel > JMSConstants.CACHE_CONNECTION) {
+ return getSharedSession();
+ } else {
+ return createSession((connection == null ? getConnection() : connection));
+ }
+ }
- } catch (Exception e1) {
- log.warn("JMS reconnection attempt failed for connection factory : " + name, e);
- }
+ /**
+ * Get a new MessageProducer or shared MessageProducer from this JMS CF
+ * @param connection the Connection to be used
+ * @param session the Session to be used
+ * @param destination the Destination to bind MessageProducer to
+ * @return new or shared MessageProducer from this JMS CF
+ */
+ public MessageProducer getMessageProducer(
+ Connection connection, Session session, Destination destination) {
+ if (cacheLevel > JMSConstants.CACHE_SESSION) {
+ return getSharedProducer();
+ } else {
+ return createProducer((session == null ? getSession(connection) : session), destination);
+ }
+ }
- if (wasError == true) {
- try {
- log.info("Attempting reconnection for connection factory " + name +
- " in " + getReconnectTimeout()/1000 + " seconds");
- Thread.sleep(getReconnectTimeout());
- } catch (InterruptedException ignore) {}
+ /**
+ * Get a new Connection or shared Connection from this JMS CF
+ * @return new or shared Connection from this JMS CF
+ */
+ private Connection getSharedConnection() {
+ if (sharedConnection == null) {
+ sharedConnection = createConnection();
+ if (log.isDebugEnabled()) {
+ log.debug("Created shared JMS Connection for JMS CF : " + name);
}
- } // wasError
-
+ }
+ return sharedConnection;
}
/**
- * Temporarily pause receiving new messages
+ * Get a shared Session from this JMS CF
+ * @return shared Session from this JMS CF
*/
- public void pause() {
- try {
- connection.stop();
- } catch (JMSException e) {
- handleException("Error pausing JMS connection for factory : " + name, e);
+ private Session getSharedSession() {
+ if (sharedSession == null) {
+ sharedSession = createSession(getSharedConnection());
+ if (log.isDebugEnabled()) {
+ log.debug("Created shared JMS Session for JMS CF : " + name);
+ }
}
+ return sharedSession;
}
/**
- * Resume from temporarily pause
+ * Get a shared MessageProducer from this JMS CF
+ * @return shared MessageProducer from this JMS CF
*/
- public void resume() {
- try {
- connection.start();
- } catch (JMSException e) {
- handleException("Error resuming JMS connection for factory : " + name, e);
+ private MessageProducer getSharedProducer() {
+ if (sharedProducer == null) {
+ sharedProducer = createProducer(getSharedSession(), sharedDestination);
+ if (log.isDebugEnabled()) {
+ log.debug("Created shared JMS MessageConsumer for JMS CF : " + name);
+ }
}
+ return sharedProducer;
}
}
Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java Mon Dec 8 10:15:40 2008
@@ -36,80 +36,43 @@
* Class managing a set of {@link JMSConnectionFactory} objects.
*/
public class JMSConnectionFactoryManager {
+
private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class);
-
+
/** A Map containing the JMS connection factories managed by this, keyed by name */
- private final Map<String,JMSConnectionFactory> connectionFactories = new HashMap<String,JMSConnectionFactory>();
-
- private final ConfigurationContext cfgCtx;
-
- private final JMSListener jmsListener;
-
- private final WorkerPool workerPool;
-
- public JMSConnectionFactoryManager(ConfigurationContext cfgCtx) {
- this.cfgCtx = cfgCtx;
- jmsListener = null;
- workerPool = null;
- }
-
- public JMSConnectionFactoryManager(ConfigurationContext cfgCtx, JMSListener jmsListener, WorkerPool workerPool) {
- this.cfgCtx = cfgCtx;
- this.jmsListener = jmsListener;
- this.workerPool = workerPool;
+ private final Map<String,JMSConnectionFactory> connectionFactories =
+ new HashMap<String,JMSConnectionFactory>();
+
+ /**
+ * Construct a Connection factory manager for the JMS transport sender or receiver
+ * @param trpInDesc
+ */
+ public JMSConnectionFactoryManager(ParameterInclude trpInDesc) {
+ loadConnectionFactoryDefinitions(trpInDesc);
}
-
+
/**
* Create JMSConnectionFactory instances for the definitions in the transport configuration,
* and add these into our collection of connectionFactories map keyed by name
*
* @param trpDesc the transport description for JMS
*/
- public void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) {
-
- // iterate through all defined connection factories
- Iterator<?> conFacIter = trpDesc.getParameters().iterator();
-
- while (conFacIter.hasNext()) {
- Parameter conFacParams = (Parameter) conFacIter.next();
-
- JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(conFacParams.getName(), jmsListener, workerPool, cfgCtx);
- JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
+ private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) {
- connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
- }
- }
-
- /**
- * Get the names of the defined connection factories.
- * @return
- */
- public String[] getNames() {
- Collection<String> result = connectionFactories.keySet();
- return result.toArray(new String[result.size()]);
- }
-
- /**
- * Start all connection factories.
- *
- * @throws AxisFault
- */
- public void start() throws AxisFault {
- for (JMSConnectionFactory conFac : connectionFactories.values()) {
+ for (Object o : trpDesc.getParameters()) {
+ JMSConnectionFactory jmsConFactory = null;
try {
- conFac.connectAndListen();
- } catch (JMSException e) {
- handleException("Error starting connection factory : " + conFac.getName(), e);
- } catch (NamingException e) {
- handleException("Error starting connection factory : " + conFac.getName(), e);
+ jmsConFactory = new JMSConnectionFactory((Parameter) o);
+ connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
+ } catch (AxisJMSException e) {
+ log.error("Error setting up connection factory : " + jmsConFactory.getName(), e);
}
}
}
/**
* Get the JMS connection factory with the given name.
- *
+ *
* @param name the name of the JMS connection factory
* @return the JMS connection factory or null if no connection factory with
* the given name exists
@@ -117,72 +80,45 @@
public JMSConnectionFactory getJMSConnectionFactory(String name) {
return connectionFactories.get(name);
}
-
+
/**
* Get the JMS connection factory that matches the given properties, i.e. referring to
- * the same underlying connection factory.
- *
- * @param props
+ * the same underlying connection factory. Used by the JMSSender to determine if already
+ * available resources should be used for outgoing messages
+ *
+ * @param props a Map of connection factory JNDI properties and name
* @return the JMS connection factory or null if no connection factory compatible
* with the given properties exists
*/
public JMSConnectionFactory getJMSConnectionFactory(Map<String,String> props) {
for (JMSConnectionFactory cf : connectionFactories.values()) {
- Map<String,String> jndiProperties = cf.getJndiProperties();
- if (equals(props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM), jndiProperties.get(JMSConstants.CONFAC_JNDI_NAME_PARAM))
+ Map<String,String> cfProperties = cf.getParameters();
+
+ if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME),
+ cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME))
&&
- equals(props.get(Context.INITIAL_CONTEXT_FACTORY), jndiProperties.get(Context.INITIAL_CONTEXT_FACTORY))
+ equals(props.get(Context.INITIAL_CONTEXT_FACTORY),
+ cfProperties.get(Context.INITIAL_CONTEXT_FACTORY))
&&
- equals(props.get(Context.PROVIDER_URL), jndiProperties.get(Context.PROVIDER_URL))
+ equals(props.get(Context.PROVIDER_URL),
+ cfProperties.get(Context.PROVIDER_URL))
&&
- equals(props.get(Context.SECURITY_PRINCIPAL), jndiProperties.get(Context.SECURITY_PRINCIPAL))
+ equals(props.get(Context.SECURITY_PRINCIPAL),
+ cfProperties.get(Context.SECURITY_PRINCIPAL))
&&
- equals(props.get(Context.SECURITY_CREDENTIALS), jndiProperties.get(Context.SECURITY_CREDENTIALS))) {
+ equals(props.get(Context.SECURITY_CREDENTIALS),
+ cfProperties.get(Context.SECURITY_CREDENTIALS))) {
return cf;
}
}
return null;
}
-
- /**
- * Prevents NullPointerException when s1 is null.
- * If both values are null this returns true
- */
- private static boolean equals(Object s1, Object s2) {
- if(s1 == s2) {
- return true;
- } else if(s1 != null && s1.equals(s2)) {
- return true;
- } else {
- return false;
- }
- }
/**
- * Pause all connection factories.
+ * Compare two values preventing NPEs
*/
- public void pause() {
- for (JMSConnectionFactory conFac : connectionFactories.values()) {
- conFac.pause();
- }
- }
-
- /**
- * Resume all connection factories.
- */
- public void resume() {
- for (JMSConnectionFactory conFac : connectionFactories.values()) {
- conFac.resume();
- }
- }
-
- /**
- * Stop all connection factories.
- */
- public void stop() {
- for (JMSConnectionFactory conFac : connectionFactories.values()) {
- conFac.stop();
- }
+ private static boolean equals(Object s1, Object s2) {
+ return s1 == s2 || s1 != null && s1.equals(s2);
}
protected void handleException(String msg, Exception e) throws AxisFault {
Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java Mon Dec 8 10:15:40 2008
@@ -24,10 +24,7 @@
*/
public static final String JMS_PREFIX = "jms:/";
- public static final String ACTIVEMQ_DYNAMIC_QUEUE = "dynamicQueues/";
- public static final String ACTIVEMQ_DYNAMIC_TOPIC = "dynamicTopics/";
-
- //------------------------------------ defaults ------------------------------------
+ //------------------------------------ defaults / constants ------------------------------------
/**
* The local (Axis2) JMS connection factory name of the default connection
* factory to be used, if a service does not explicitly state the connection
@@ -35,116 +32,234 @@
*/
public static final String DEFAULT_CONFAC_NAME = "default";
/**
- * The default JMS time out waiting for a reply
+ * The default JMS time out waiting for a reply - also see {@link JMS_WAIT_REPLY}
*/
public static final long DEFAULT_JMS_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
-
- //-------------------------- services.xml parameters --------------------------------
/**
- * The Parameter name indicating a JMS destination for requests
+ * Value indicating a Queue used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
*/
- public static final String DEST_PARAM = "transport.jms.Destination";
+ public static final String DESTINATION_TYPE_QUEUE = "queue";
/**
- * The Parameter name indicating a JMS destination type for requests. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC
+ * Value indicating a Topic used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
*/
- public static final String DEST_PARAM_TYPE = "transport.jms.DestinationType";
+ public static final String DESTINATION_TYPE_TOPIC = "topic";
/**
- * The Parameter name indicating the response JMS destination
+ * Value indicating a JMS 1.1 Generic Destination used by {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
*/
- public static final String REPLY_PARAM = "transport.jms.ReplyDestination";
+ public static final String DESTINATION_TYPE_GENERIC = "generic";
+
+ /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */
+ public static final int CACHE_NONE = 0;
+ /** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/
+ public static final int CACHE_CONNECTION = 1;
+ /** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */
+ public static final int CACHE_SESSION = 2;
+ /** Cache the JMS connection, Session and Consumer between tasks when receiving*/
+ public static final int CACHE_CONSUMER = 3;
+ /** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */
+ public static final int CACHE_PRODUCER = 4;
+ /** automatic choice of an appropriate caching level (depending on the transaction strategy) */
+ public static final int CACHE_AUTO = 5;
+
+ /** A JMS 1.1 Generic Destination type or ConnectionFactory */
+ public static final int GENERIC = 0;
+ /** A Queue Destination type or ConnectionFactory */
+ public static final int QUEUE = 1;
+ /** A Topic Destination type or ConnectionFactory */
+ public static final int TOPIC = 2;
+
/**
- * The Parameter name indicating the response JMS destination. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC
+ * The EPR parameter name indicating the name of the message level property that indicated the content type.
*/
- public static final String REPLY_PARAM_TYPE = "transport.jms.ReplyDestinationType";
+ public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty";
+
+ //---------------------------------- services.xml parameters -----------------------------------
/**
- * The EPR parameter name indicating the message property to use to store the content type.
+ * The Service level Parameter name indicating the JMS destination for requests of a service
*/
- public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty";
-
+ public static final String PARAM_DESTINATION = "transport.jms.Destination";
/**
- * Values used for DEST_PARAM_TYPE, REPLY_PARAM_TYPE
+ * The Service level Parameter name indicating the destination type for requests.
+ * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
*/
- public static final String DESTINATION_TYPE_QUEUE = "queue";
- public static final String DESTINATION_TYPE_TOPIC = "topic";
-
+ public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType";
+ /**
+ * The Service level Parameter name indicating the [default] response destination of a service
+ */
+ public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination";
+ /**
+ * The Service level Parameter name indicating the response destination type
+ * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
+ */
+ public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType";
/**
* The Parameter name of an Axis2 service, indicating the JMS connection
* factory which should be used to listen for messages for it. This is
* the local (Axis2) name of the connection factory and not the JNDI name
*/
- public static final String CONFAC_PARAM = "transport.jms.ConnectionFactory";
- /**
- * If reconnect timeout if connection error occurs in seconds
- */
- public static final String RECONNECT_TIMEOUT = "transport.jms.ReconnectTimeout";
+ public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory";
/**
* Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC
*/
- public static final String CONFAC_TYPE = "transport.jms.ConnectionFactoryType";
+ public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType";
/**
* The Parameter name indicating the JMS connection factory JNDI name
*/
- public static final String CONFAC_JNDI_NAME_PARAM = "transport.jms.ConnectionFactoryJNDIName";
+ public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName";
/**
- * The parameter indicating the expected content type for messages received by the service.
+ * The Parameter indicating the expected content type for messages received by the service.
*/
public static final String CONTENT_TYPE_PARAM = "transport.jms.ContentType";
+ /**
+ * The Parameter indicating a final EPR as a String, to be published on the WSDL of a service
+ * Could occur more than once, and could provide additional connection properties or a subset
+ * of the properties auto computed. Also could replace IP addresses with hostnames, and expose
+ * public credentials clients. If a user specified this parameter, the auto generated EPR will
+ * not be exposed - unless an instance of this parameter is added with the string "legacy"
+ * This parameter could be used to expose EPR's conforming to the proposed SOAP/JMS spec
+ * until such time full support is implemented for it.
+ */
+ public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR";
+ /** The parameter indicating the JMS API specification to be used - if this is "1.1" the JMS
+ * 1.1 API would be used, else the JMS 1.0.2B
+ */
+ public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion";
- //------------ message context / transport header properties and client options ------------
/**
- * A MessageContext property or client Option stating the JMS message type
+ * The Parameter indicating whether the JMS Session should be transacted for the service
+ * Specified as a "true" or "false"
+ */
+ public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted";
+ /**
+ * The Parameter indicating the Session acknowledgement for the service. Must be one of the
+ * following Strings, or the appropriate Integer used by the JMS API
+ * "AUTO_ACKNOWLEDGE", "CLIENT_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" or "SESSION_TRANSACTED"
+ */
+ public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement";
+ /** A message selector to be used when messages are sought for this service */
+ public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector";
+ /** Is the Subscription durable ? - "true" or "false" See {@link PARAM_DURABLE_SUB_NAME} */
+ public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable";
+ /** The name for the durable subscription See {@link PARAM_SUB_DURABLE}*/
+ public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName";
+ /**
+ * JMS Resource cachable level to be used for the service One of the following:
+ * {@link CACHE_NONE}, {@link CACHE_CONNECTION}, {@link CACHE_SESSION}, {@link CACHE_PRODUCER},
+ * {@link CACHE_CONSUMER}, or {@link CACHE_AUTO} - to let the transport decide
+ */
+ public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel";
+ /** Should a pub-sub connection receive messages published by itself? */
+ public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal";
+ /**
+ * The number of milliseconds to wait for a message on a consumer.receive() call
+ * negative number - wait forever
+ * 0 - do not wait at all
+ * positive number - indicates the number of milliseconds to wait
+ */
+ public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout";
+ /**
+ *The number of concurrent consumers to be created to poll for messages for this service
+ * For Topics, this should be ONE, to prevent receipt of multiple copies of the same message
+ */
+ public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers";
+ /**
+ * The maximum number of concurrent consumers for the service - See {@link PARAM_CONCURRENT_CONSUMERS}
+ */
+ public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers";
+ /**
+ * The number of idle (i.e. message-less) polling attempts before a worker task commits suicide,
+ * to scale down resources, as load decreases
+ */
+ public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit";
+ /**
+ * The maximum number of messages a polling worker task should process, before suicide - to
+ * prevent many longer running threads - default is unlimited (i.e. a worker task will live forever)
+ */
+ public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask";
+ /**
+ * Number of milliseconds before the first reconnection attempt is tried, on detection of an
+ * error. Subsequent retries follow a geometric series, where the
+ * duration = previous duration * factor
+ * This is further limited by the {@link PARAM_RECON_MAX_DURATION} to be meaningful
+ */
+ public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration";
+ /** @see PARAM_RECON_INIT_DURATION */
+ public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor";
+ /** @see PARAM_RECON_INIT_DURATION */
+ public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration";
+
+ /** The username to use when obtaining a JMS Connection */
+ public static final String PARAM_JMS_USERNAME = "transport.jms.UserName";
+ /** The password to use when obtaining a JMS Connection */
+ public static final String PARAM_JMS_PASSWORD = "transport.jms.Password";
+
+ //-------------- message context / transport header properties and client options --------------
+ /**
+ * A MessageContext property or client Option indicating the JMS message type
*/
public static final String JMS_MESSAGE_TYPE = "JMS_MESSAGE_TYPE";
/**
- * The message type indicating a BytesMessage. See JMS_MESSAGE_TYPE
+ * The message type indicating a BytesMessage. See {@link JMS_MESSAGE_TYPE}
*/
public static final String JMS_BYTE_MESSAGE = "JMS_BYTE_MESSAGE";
/**
- * The message type indicating a TextMessage. See JMS_MESSAGE_TYPE
+ * The message type indicating a TextMessage. See {@link JMS_MESSAGE_TYPE}
*/
public static final String JMS_TEXT_MESSAGE = "JMS_TEXT_MESSAGE";
/**
- * A MessageContext property or client Option stating the time to wait for a response JMS message
+ * A MessageContext property or client Option indicating the time to wait for a response JMS message
*/
public static final String JMS_WAIT_REPLY = "JMS_WAIT_REPLY";
/**
- * A MessageContext property or client Option stating the JMS correlation id
+ * A MessageContext property or client Option indicating the JMS correlation id
*/
public static final String JMS_COORELATION_ID = "JMS_COORELATION_ID";
/**
- * A MessageContext property or client Option stating the JMS message id
+ * A MessageContext property or client Option indicating the JMS message id
*/
public static final String JMS_MESSAGE_ID = "JMS_MESSAGE_ID";
/**
- * A MessageContext property or client Option stating the JMS delivery mode
+ * A MessageContext property or client Option indicating the JMS delivery mode as an Integer or String
+ * Value 1 - javax.jms.DeliveryMode.NON_PERSISTENT
+ * Value 2 - javax.jms.DeliveryMode.PERSISTENT
*/
public static final String JMS_DELIVERY_MODE = "JMS_DELIVERY_MODE";
/**
- * A MessageContext property or client Option stating the JMS destination
+ * A MessageContext property or client Option indicating the JMS destination to use on a Send
*/
public static final String JMS_DESTINATION = "JMS_DESTINATION";
/**
- * A MessageContext property or client Option stating the JMS expiration
+ * A MessageContext property or client Option indicating the JMS message expiration - a Long value
+ * specified as a String
*/
public static final String JMS_EXPIRATION = "JMS_EXPIRATION";
/**
- * A MessageContext property or client Option stating the JMS priority
- */
- public static final String JMS_PRIORITY = "JMS_PRIORITY";
- /**
- * A MessageContext property stating if the message is a redelivery
+ * A MessageContext property indicating if the message is a redelivery (Boolean as a String)
*/
public static final String JMS_REDELIVERED = "JMS_REDELIVERED";
/**
- * A MessageContext property or client Option stating the JMS replyTo
+ * A MessageContext property or client Option indicating the JMS replyTo Destination
*/
public static final String JMS_REPLY_TO = "JMS_REPLY_TO";
/**
- * A MessageContext property or client Option stating the JMS timestamp
+ * A MessageContext property or client Option indicating the JMS replyTo Destination type
+ * See {@link DESTINATION_TYPE_QUEUE} and {@link DESTINATION_TYPE_TOPIC}
+ */
+ public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE";
+ /**
+ * A MessageContext property or client Option indicating the JMS timestamp (Long specified as String)
*/
public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP";
/**
- * A MessageContext property or client Option stating the JMS type
+ * A MessageContext property indicating the JMS type String returned by {@link javax.jms.Message.getJMSType()}
*/
public static final String JMS_TYPE = "JMS_TYPE";
+ /**
+ * A MessageContext property or client Option indicating the JMS priority
+ */
+ public static final String JMS_PRIORITY = "JMS_PRIORITY";
+ /**
+ * A MessageContext property or client Option indicating the JMS time to live for message sent
+ */
+ public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE";
}
Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java Mon Dec 8 10:15:40 2008
@@ -16,7 +16,14 @@
package org.apache.axis2.transport.jms;
import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
import org.apache.axis2.transport.jms.ctype.ContentTypeRuleSet;
+import org.apache.axis2.addressing.EndpointReference;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
/**
* Class that links an Axis2 service to a JMS destination. Additionally, it contains
@@ -24,10 +31,11 @@
* into Axis2.
*/
public class JMSEndpoint {
+ private JMSConnectionFactory cf;
private AxisService service;
private String jndiDestinationName;
- private String destinationType;
- private String endpointReference;
+ private int destinationType = JMSConstants.GENERIC;
+ private Set<EndpointReference> endpointReferences = new HashSet<EndpointReference>();
private ContentTypeRuleSet contentTypeRuleSet;
public AxisService getService() {
@@ -50,20 +58,39 @@
this.jndiDestinationName = destinationJNDIName;
}
- public String getDestinationType() {
- return destinationType;
- }
-
public void setDestinationType(String destinationType) {
- this.destinationType = destinationType;
- }
-
- public String getEndpointReference() {
- return endpointReference;
- }
-
- public void setEndpointReference(String endpointReference) {
- this.endpointReference = endpointReference;
+ if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) {
+ this.destinationType = JMSConstants.TOPIC;
+ } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) {
+ this.destinationType = JMSConstants.QUEUE;
+ } else {
+ this.destinationType = JMSConstants.GENERIC;
+ }
+ }
+
+ public EndpointReference[] getEndpointReferences() {
+ return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]);
+ }
+
+ public void computeEPRs() {
+ List<EndpointReference> eprs = new ArrayList<EndpointReference>();
+ for (Object o : getService().getParameters()) {
+ Parameter p = (Parameter) o;
+ if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) {
+ if ("legacy".equalsIgnoreCase((String) p.getValue())) {
+ // if "legacy" specified, compute and replace it
+ endpointReferences.add(
+ new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
+ } else {
+ endpointReferences.add(new EndpointReference((String) p.getValue()));
+ }
+ }
+ }
+
+ if (eprs.isEmpty()) {
+ // if nothing specified, compute and return legacy EPR
+ endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
+ }
}
public ContentTypeRuleSet getContentTypeRuleSet() {
@@ -73,4 +100,12 @@
public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) {
this.contentTypeRuleSet = contentTypeRuleSet;
}
+
+ public JMSConnectionFactory getCf() {
+ return cf;
+ }
+
+ public void setCf(JMSConnectionFactory cf) {
+ this.cf = cf;
+ }
}
Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java Mon Dec 8 10:15:40 2008
@@ -24,7 +24,6 @@
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.transport.base.AbstractTransportListener;
import org.apache.axis2.transport.base.BaseConstants;
-import org.apache.axis2.transport.base.BaseUtils;
import org.apache.axis2.transport.base.ManagementSupport;
import org.apache.axis2.transport.base.event.TransportErrorListener;
import org.apache.axis2.transport.base.event.TransportErrorSource;
@@ -41,88 +40,57 @@
import javax.jms.TextMessage;
/**
- * The JMS Transport listener implementation. A JMS Listner will hold one or
- * more JMS connection factories, which would be created at initialization
- * time. This implementation does not support the creation of connection
- * factories at runtime. This JMS Listener registers with Axis to be notified
- * of service deployment/undeployment/start and stop, and enables or disables
- * listening for messages on the destinations as appropriate.
- * <p/>
- * A Service could state the JMS connection factory name and the destination
- * name for use as Parameters in its services.xml as shown in the example
- * below. If the connection name was not specified, it will use the connection
- * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a
- * factory is defined in the Axis2.xml. If the destination name is not specified
- * it will default to a JMS queue by the name of the service. If the destination
- * should be a Topic, it should be created on the JMS implementation, and
- * specified in the services.xml of the service.
- * <p/>
- * <parameter name="transport.jms.ConnectionFactory" locked="true">
- * myTopicConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="true">
- * dynamicTopics/something.TestTopic</parameter>
+ * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances
+ * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped.
+ * <p>
+ * A service indicates a JMS Connection factory definition by name, which would be defined in the
+ * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between
+ * services, as well as to optimize resources utilized
+ * <p>
+ * If the connection factory name was not specified, it will default to the one named "default"
+ * {@see JMSConstants.DEFAULT_CONFAC_NAME}
+ * <p>
+ * If a destination JNDI name is not specified, a service will expect to use a Queue with the same
+ * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify
+ * many more detailed control options. See package documentation for more details
+ * <p>
+ * All Destinations / JMS Administered objects used MUST be pre-created or already available
*/
public class JMSListener extends AbstractTransportListener implements ManagementSupport,
- TransportErrorSource {
+ TransportErrorSource {
public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
+ /** The JMSConnectionFactoryManager which centralizes the management of defined factories */
private JMSConnectionFactoryManager connFacManager;
/** A Map of service name to the JMS endpoints */
private Map<String,JMSEndpoint> serviceNameToEndpointMap = new HashMap<String,JMSEndpoint>();
-
+ /** A Map of service name to its ServiceTaskManager instances */
+ private Map<String, ServiceTaskManager> serviceNameToSTMMap =
+ new HashMap<String, ServiceTaskManager>();
private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this);
/**
- * This is the TransportListener initialization method invoked by Axis2
+ * TransportListener initialization
*
- * @param cfgCtx the Axis configuration context
+ * @param cfgCtx the Axis configuration context
* @param trpInDesc the TransportIn description
*/
public void init(ConfigurationContext cfgCtx,
- TransportInDescription trpInDesc) throws AxisFault {
- super.init(cfgCtx, trpInDesc);
-
- connFacManager = new JMSConnectionFactoryManager(cfgCtx, this, workerPool);
- // read the connection factory definitions and create them
- connFacManager.loadConnectionFactoryDefinitions(trpInDesc);
-
- // if no connection factories are defined, we cannot listen for any messages
- if (connFacManager.getNames().length == 0) {
- log.warn("No JMS connection factories are defined. Cannot listen for JMS");
- return;
- }
+ TransportInDescription trpInDesc) throws AxisFault {
+ super.init(cfgCtx, trpInDesc);
+ connFacManager = new JMSConnectionFactoryManager(trpInDesc);
log.info("JMS Transport Receiver/Listener initialized...");
}
/**
- * Start this JMS Listener (Transport Listener)
- *
- * @throws AxisFault
- */
- public void start() throws AxisFault {
- connFacManager.start();
- super.start();
- }
-
- /**
- * Stop the JMS Listener, and shutdown all of the connection factories
- */
- public void stop() throws AxisFault {
- super.stop();
- connFacManager.stop();
- }
-
- /**
- * Returns EPRs for the given service and IP over the JMS transport
+ * Returns EPRs for the given service over the JMS transport
*
* @param serviceName service name
- * @param ip ignored
- * @return the EPR for the service
- * @throws AxisFault not used
+ * @return the JMS EPRs for the service
*/
- public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+ public EndpointReference[] getEPRsForService(String serviceName) {
//Strip out the operation name
if (serviceName.indexOf('/') != -1) {
serviceName = serviceName.substring(0, serviceName.indexOf('/'));
@@ -133,28 +101,29 @@
}
JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName);
if (endpoint != null) {
- return new EndpointReference[] { new EndpointReference(endpoint.getEndpointReference()) };
+ return endpoint.getEndpointReferences();
} else {
return null;
}
}
/**
- * Prepare to listen for JMS messages on behalf of the given service
+ * Listen for JMS messages on behalf of the given service
*
- * @param service the service for which to listen for messages
+ * @param service the Axis service for which to listen for messages
*/
protected void startListeningForService(AxisService service) throws AxisFault {
JMSConnectionFactory cf = getConnectionFactory(service);
if (cf == null) {
throw new AxisFault("The service doesn't specify a JMS connection factory or refers " +
- "to an invalid factory.");
+ "to an invalid factory.");
}
JMSEndpoint endpoint = new JMSEndpoint();
endpoint.setService(service);
-
- Parameter destParam = service.getParameter(JMSConstants.DEST_PARAM);
+ endpoint.setCf(cf);
+
+ Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION);
if (destParam != null) {
endpoint.setJndiDestinationName((String)destParam.getValue());
} else {
@@ -162,10 +131,10 @@
endpoint.setJndiDestinationName(service.getName());
}
- Parameter destTypeParam = service.getParameter(JMSConstants.DEST_PARAM_TYPE);
+ Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE);
if (destTypeParam != null) {
String paramValue = (String) destTypeParam.getValue();
- if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
+ if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
endpoint.setDestinationType(paramValue);
} else {
@@ -186,15 +155,30 @@
} else {
endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam));
}
-
- // compute service EPR and keep for later use
- endpoint.setEndpointReference(JMSUtils.getEPR(cf, endpoint));
+
+ endpoint.computeEPRs(); // compute service EPR and keep for later use
serviceNameToEndpointMap.put(service.getName(), endpoint);
- log.info("Starting to listen on destination : " + endpoint.getJndiDestinationName() + " of type "
- + endpoint.getDestinationType() + " for service " + service.getName());
- cf.addDestination(endpoint);
- cf.startListeningOnDestination(endpoint);
+ ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool);
+ stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint));
+ stm.start();
+ serviceNameToSTMMap.put(service.getName(), stm);
+
+ for (int i=0; i<3; i++) {
+ if (stm.getActiveTaskCount() > 0) {
+ log.info("Started to listen on destination : " + stm.getDestinationJNDIName() +
+ " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
+ " for service " + stm.getServiceName());
+ return;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {}
+ }
+
+ log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() +
+ " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
+ " for service " + stm.getServiceName() + " have not yet started after 3 seconds ..");
}
/**
@@ -204,12 +188,22 @@
*/
protected void stopListeningForService(AxisService service) {
- JMSConnectionFactory cf = getConnectionFactory(service);
- if (cf != null) {
- // remove from the serviceNameToEprMap
- JMSEndpoint endpoint = serviceNameToEndpointMap.remove(service.getName());
+ ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName());
+ if (stm != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() +
+ " for service : " + stm.getServiceName());
+ }
+
+ stm.stop();
+
+ serviceNameToSTMMap.remove(service.getName());
+ serviceNameToEndpointMap.remove(service.getName());
+ log.info("Stopped listening for JMS messages to service : " + service.getName());
- cf.removeDestination(endpoint.getJndiDestinationName());
+ } else {
+ log.error("Unable to stop service : " + service.getName() +
+ " - unable to find its ServiceTaskManager");
}
}
/**
@@ -220,12 +214,12 @@
* @param service the AxisService
* @return the JMSConnectionFactory to be used, or null if reference is invalid
*/
- private JMSConnectionFactory getConnectionFactory(AxisService service) {
- Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM);
+ public JMSConnectionFactory getConnectionFactory(AxisService service) {
+ Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC);
// validate connection factory name (specified or default)
if (conFacParam != null) {
- return connFacManager.getJMSConnectionFactory((String)conFacParam.getValue());
+ return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue());
} else {
return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME);
}
@@ -240,11 +234,13 @@
public void pause() throws AxisFault {
if (state != BaseConstants.STARTED) return;
try {
- connFacManager.pause();
+ for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
+ stm.pause();
+ }
state = BaseConstants.PAUSED;
log.info("Listener paused");
} catch (AxisJMSException e) {
- log.error("At least one connection factory could not be paused", e);
+ log.error("At least one service could not be paused", e);
}
}
@@ -255,11 +251,13 @@
public void resume() throws AxisFault {
if (state != BaseConstants.PAUSED) return;
try {
- connFacManager.resume();
+ for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
+ stm.resume();
+ }
state = BaseConstants.STARTED;
log.info("Listener resumed");
} catch (AxisJMSException e) {
- log.error("At least one connection factory could not be resumed", e);
+ log.error("At least one service could not be resumed", e);
}
}