You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2008/08/23 22:43:58 UTC
svn commit: r688410 - in
/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms:
JMSConnectionFactory.java JMSListener.java JMSMessageReceiver.java
JMSSender.java
Author: veithen
Date: Sat Aug 23 13:43:57 2008
New Revision: 688410
URL: http://svn.apache.org/viewvc?rev=688410&view=rev
Log:
SYNAPSE-369: Eliminated JMSConnectionFactory#serviceDestinationNameMapping and modified the JMS transport listener to use one JMSMessageReceiver instance per Axis service.
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java Sat Aug 23 13:43:57 2008
@@ -20,6 +20,7 @@
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.base.threads.WorkerPool;
import javax.jms.*;
import javax.naming.Context;
@@ -77,14 +78,16 @@
/** The name used for the connection factory definition within Axis2 */
private String name = null;
+ /** The JMS transport listener instance. */
+ private final JMSListener jmsListener;
+ /** The worker pool to use. */
+ private final WorkerPool workerPool;
/** The JNDI name of the actual connection factory */
private String connFactoryJNDIName = null;
/** Map of destination JNDI names to service names */
private Map<String,String> serviceJNDINameMapping = null;
/** Map of destination JNDI names to destination types*/
private Map<String,String> destinationTypeMapping = null;
- /** Map of JMS destination names to service names */
- private Map<String,String> serviceDestinationNameMapping = null;
/** JMS Sessions currently active. One session for each Destination / Service */
private Map<String,Session> jmsSessions = null;
/** Properties of the connection factory to acquire the initial context */
@@ -97,8 +100,6 @@
private String connectionFactoryType = null;
/** The JMS Connection opened */
private Connection connection = null;
- /** The JMS Message receiver for this connection factory */
- private JMSMessageReceiver jmsMessageReceiver = null;
/** The axis2 configuration context */
private ConfigurationContext cfgCtx = null;
/** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */
@@ -110,14 +111,19 @@
*
* @param name the connection factory name specified in the axis2.xml for the
* TransportListener or the TransportSender using this
+ * @param jmsListener the JMS transport listener, or null if the connection factory
+ * is not linked to a transport listener
+ * @param workerPool the worker pool to be used to process incoming messages; may be null
* @param cfgCtx the axis2 configuration context
*/
- public JMSConnectionFactory(String name, ConfigurationContext cfgCtx) {
+ public JMSConnectionFactory(String name, JMSListener jmsListener, WorkerPool workerPool,
+ ConfigurationContext cfgCtx) {
this.name = name;
+ this.jmsListener = jmsListener;
+ this.workerPool = workerPool;
this.cfgCtx = cfgCtx;
serviceJNDINameMapping = new HashMap<String,String>();
destinationTypeMapping = new HashMap<String,String>();
- serviceDestinationNameMapping = new HashMap<String,String>();
jndiProperties = new Hashtable<String,String>();
jmsSessions = new HashMap<String,Session>();
}
@@ -155,7 +161,6 @@
serviceJNDINameMapping.put(destinationJNDIName, serviceName);
destinationTypeMapping.put(destinationJNDIName, destinationType);
- serviceDestinationNameMapping.put(destinationName, serviceName);
log.info("Mapped JNDI name : " + destinationJNDIName + " and JMS Destination name : " +
destinationName + " against service : " + serviceName);
@@ -167,15 +172,8 @@
* @param jndiDestinationName the JNDI name of the JMS destination to be removed
*/
public void removeDestination(String jndiDestinationName) {
-
- // find and save provider specific Destination name before we delete
- String providerSpecificDestination = getPhysicalDestinationName(jndiDestinationName);
stoplisteningOnDestination(jndiDestinationName);
-
serviceJNDINameMapping.remove(jndiDestinationName);
- if (providerSpecificDestination != null) {
- serviceDestinationNameMapping.remove(providerSpecificDestination);
- }
}
/**
@@ -250,9 +248,11 @@
handleException("Error connecting to Connection Factory : " + connFactoryJNDIName, e);
}
- for (String destJNDIName : serviceJNDINameMapping.keySet()) {
+ for (Map.Entry<String,String> entry : serviceJNDINameMapping.entrySet()) {
+ String destJNDIName = entry.getKey();
+ String serviceName = entry.getValue();
String destinationType = destinationTypeMapping.get(destJNDIName);
- startListeningOnDestination(destJNDIName, destinationType);
+ startListeningOnDestination(destJNDIName, destinationType, serviceName);
}
connection.start(); // indicate readiness to start receiving messages
@@ -296,7 +296,9 @@
*
* @param destinationJNDIname the JMS destination to listen on
*/
- public void startListeningOnDestination(String destinationJNDIname, String destinationType) {
+ public void startListeningOnDestination(String destinationJNDIname,
+ String destinationType,
+ String serviceName) {
Session session = jmsSessions.get(destinationJNDIname);
// if we already had a session open, close it first
@@ -319,7 +321,8 @@
}
MessageConsumer consumer = JMSUtils.createConsumer(session, destination);
- consumer.setMessageListener(jmsMessageReceiver);
+ consumer.setMessageListener(new JMSMessageReceiver(jmsListener, this, workerPool,
+ cfgCtx, serviceName));
jmsSessions.put(destinationJNDIname, session);
// catches NameNotFound and JMSExceptions and marks service as faulty
@@ -491,38 +494,6 @@
}
// -------------------- getters and setters and trivial methods --------------------
- /**
- * Return the service name using the JMS destination given by the JNDI name
- *
- * @param jmsDestinationName the JMS destination name
- * @return the name of the service using the destination
- */
- public String getServiceNameForDestinationName(String jmsDestinationName) {
- return serviceDestinationNameMapping.get(jmsDestinationName);
- }
-
- /**
- * Return the service name using the JMS destination and its JNDI name
- *
- * @param dest the JMS Destination Queue or Topic
- * @param jmsDestinationName the JMS destination name
- * @return the name of the service using the destination
- */
- public String getServiceNameForDestination(Destination dest, String jmsDestinationName) {
- String serviceName = serviceDestinationNameMapping.get(jmsDestinationName);
-
- // hack to get around the crazy Active MQ dynamic queue and topic issues
- if (serviceName == null) {
- String provider = getJndiProperties().get(Context.INITIAL_CONTEXT_FACTORY);
- if (provider.indexOf("activemq") != -1) {
- serviceName = getServiceNameForJNDIName(
- (dest instanceof Queue ?
- JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE :
- JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC) + jmsDestinationName);
- }
- }
- return serviceName;
- }
/**
* Return the service name using the JMS destination given by the JNDI name
@@ -565,18 +536,10 @@
return jndiProperties;
}
- public JMSMessageReceiver getJmsMessageReceiver() {
- return jmsMessageReceiver;
- }
-
public Context getContext() {
return context;
}
- public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) {
- this.jmsMessageReceiver = jmsMessageReceiver;
- }
-
private void handleException(String msg, Exception e) throws AxisJMSException {
log.error(msg, e);
throw new AxisJMSException(msg, e);
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java Sat Aug 23 13:43:57 2008
@@ -94,9 +94,6 @@
public void start() throws AxisFault {
for (JMSConnectionFactory conFac : connectionFactories.values()) {
- conFac.setJmsMessageReceiver(
- new JMSMessageReceiver(this, conFac, workerPool, cfgCtx));
-
try {
conFac.connectAndListen();
} catch (JMSException e) {
@@ -167,7 +164,7 @@
log.info("Starting to listen on destination : " + destinationName + " of type "
+ destinationType + " for service " + service.getName());
cf.addDestination(destinationName, destinationType, service.getName());
- cf.startListeningOnDestination(destinationName, destinationType);
+ cf.startListeningOnDestination(destinationName, destinationType, service.getName());
}
/**
@@ -229,7 +226,7 @@
Parameter conFacParams = (Parameter) conFacIter.next();
JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
+ new JMSConnectionFactory(conFacParams.getName(), this, workerPool, cfgCtx);
JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java Sat Aug 23 13:43:57 2008
@@ -50,6 +50,8 @@
private ConfigurationContext cfgCtx = null;
/** A reference to the JMS Connection Factory to which this applies */
private JMSConnectionFactory jmsConnectionFactory = null;
+ /** The name of the service this message receiver is bound to. */
+ final String serviceName;
/** Metrics collector */
private MetricsCollector metrics = null;
@@ -60,13 +62,15 @@
* @param jmsConFac the JMS connection factory we are associated with
* @param workerPool the worker thread pool to be used
* @param cfgCtx the axis ConfigurationContext
+ * @param serviceName the name of the Axis service
*/
JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac,
- WorkerPool workerPool, ConfigurationContext cfgCtx) {
+ WorkerPool workerPool, ConfigurationContext cfgCtx, String serviceName) {
this.jmsListener = jmsListener;
this.jmsConnectionFactory = jmsConFac;
this.workerPool = workerPool;
this.cfgCtx = cfgCtx;
+ this.serviceName = serviceName;
this.metrics = jmsListener.getMetricsCollector();
}
@@ -156,16 +160,6 @@
} catch (JMSException ignore) {}
try {
- Destination dest = message.getJMSDestination();
- String destinationName = null;
- if (dest instanceof Queue) {
- destinationName = ((Queue) dest).getQueueName();
- } else if (dest instanceof Topic) {
- destinationName = ((Topic) dest).getTopicName();
- }
-
- String serviceName =
- jmsConnectionFactory.getServiceNameForDestination(dest, destinationName);
String soapAction = JMSUtils.getInstace().
getProperty(message, BaseConstants.SOAPACTION);
AxisService service = null;
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java Sat Aug 23 13:43:57 2008
@@ -556,7 +556,7 @@
Parameter conFacParams = (Parameter) conFacIter.next();
JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
+ new JMSConnectionFactory(conFacParams.getName(), null, null, cfgCtx);
JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
try {