You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2008/08/23 22:43:58 UTC

svn commit: r688410 - in /synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms: JMSConnectionFactory.java JMSListener.java JMSMessageReceiver.java JMSSender.java

Author: veithen
Date: Sat Aug 23 13:43:57 2008
New Revision: 688410

URL: http://svn.apache.org/viewvc?rev=688410&view=rev
Log:
SYNAPSE-369: Eliminated JMSConnectionFactory#serviceDestinationNameMapping and modified the JMS transport listener to use one JMSMessageReceiver instance per Axis service.

Modified:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java Sat Aug 23 13:43:57 2008
@@ -20,6 +20,7 @@
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.base.threads.WorkerPool;
 
 import javax.jms.*;
 import javax.naming.Context;
@@ -77,14 +78,16 @@
 
     /** The name used for the connection factory definition within Axis2 */
     private String name = null;
+    /** The JMS transport listener instance. */
+    private final JMSListener jmsListener;
+    /** The worker pool to use. */
+    private final WorkerPool workerPool;
     /** The JNDI name of the actual connection factory */
     private String connFactoryJNDIName = null;
     /** Map of destination JNDI names to service names */
     private Map<String,String> serviceJNDINameMapping = null;
     /** Map of destination JNDI names to destination types*/
     private Map<String,String> destinationTypeMapping = null;
-    /** Map of JMS destination names to service names */
-    private Map<String,String> serviceDestinationNameMapping = null;
     /** JMS Sessions currently active. One session for each Destination / Service */
     private Map<String,Session> jmsSessions = null;
     /** Properties of the connection factory to acquire the initial context */
@@ -97,8 +100,6 @@
     private String connectionFactoryType = null;
     /** The JMS Connection opened */
     private Connection connection = null;
-    /** The JMS Message receiver for this connection factory */
-    private JMSMessageReceiver jmsMessageReceiver = null;
     /** The axis2 configuration context */
     private ConfigurationContext cfgCtx = null;
     /** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */
@@ -110,14 +111,19 @@
      *
      * @param name the connection factory name specified in the axis2.xml for the
      * TransportListener or the TransportSender using this
+     * @param jmsListener the JMS transport listener, or null if the connection factory
+     *                    is not linked to a transport listener
+     * @param workerPool the worker pool to be used to process incoming messages; may be null
      * @param cfgCtx the axis2 configuration context
      */
-    public JMSConnectionFactory(String name, ConfigurationContext cfgCtx) {
+    public JMSConnectionFactory(String name, JMSListener jmsListener, WorkerPool workerPool,
+                                ConfigurationContext cfgCtx) {
         this.name = name;
+        this.jmsListener = jmsListener;
+        this.workerPool = workerPool;
         this.cfgCtx = cfgCtx;
         serviceJNDINameMapping = new HashMap<String,String>();
         destinationTypeMapping = new HashMap<String,String>();
-        serviceDestinationNameMapping = new HashMap<String,String>();
         jndiProperties = new Hashtable<String,String>();
         jmsSessions = new HashMap<String,Session>();
     }
@@ -155,7 +161,6 @@
 
         serviceJNDINameMapping.put(destinationJNDIName, serviceName);
         destinationTypeMapping.put(destinationJNDIName, destinationType);
-        serviceDestinationNameMapping.put(destinationName, serviceName);
 
         log.info("Mapped JNDI name : " + destinationJNDIName + " and JMS Destination name : " +
             destinationName + " against service : " + serviceName);
@@ -167,15 +172,8 @@
      * @param jndiDestinationName the JNDI name of the JMS destination to be removed
      */
     public void removeDestination(String jndiDestinationName) {
-
-        // find and save provider specific Destination name before we delete
-        String providerSpecificDestination = getPhysicalDestinationName(jndiDestinationName);
         stoplisteningOnDestination(jndiDestinationName);
-
         serviceJNDINameMapping.remove(jndiDestinationName);
-        if (providerSpecificDestination != null) {
-            serviceDestinationNameMapping.remove(providerSpecificDestination);
-        }
     }
 
     /**
@@ -250,9 +248,11 @@
             handleException("Error connecting to Connection Factory : " + connFactoryJNDIName, e);
         }
 
-        for (String destJNDIName : serviceJNDINameMapping.keySet()) {
+        for (Map.Entry<String,String> entry : serviceJNDINameMapping.entrySet()) {
+            String destJNDIName = entry.getKey();
+            String serviceName = entry.getValue();
             String destinationType = destinationTypeMapping.get(destJNDIName);
-            startListeningOnDestination(destJNDIName, destinationType);
+            startListeningOnDestination(destJNDIName, destinationType, serviceName);
         }
 
         connection.start(); // indicate readiness to start receiving messages
@@ -296,7 +296,9 @@
      *
      * @param destinationJNDIname the JMS destination to listen on
      */
-    public void startListeningOnDestination(String destinationJNDIname, String destinationType) {
+    public void startListeningOnDestination(String destinationJNDIname,
+                                            String destinationType,
+                                            String serviceName) {
 
         Session session = jmsSessions.get(destinationJNDIname);
         // if we already had a session open, close it first
@@ -319,7 +321,8 @@
             }
 
             MessageConsumer consumer = JMSUtils.createConsumer(session, destination);
-            consumer.setMessageListener(jmsMessageReceiver);
+            consumer.setMessageListener(new JMSMessageReceiver(jmsListener, this, workerPool,
+                    cfgCtx, serviceName));
             jmsSessions.put(destinationJNDIname, session);
 
         // catches NameNotFound and JMSExceptions and marks service as faulty    
@@ -491,38 +494,6 @@
     }
 
     // -------------------- getters and setters and trivial methods --------------------
-    /**
-     * Return the service name using the JMS destination given by the JNDI name
-     *
-     * @param jmsDestinationName the JMS destination name
-     * @return the name of the service using the destination
-     */
-    public String getServiceNameForDestinationName(String jmsDestinationName) {
-        return serviceDestinationNameMapping.get(jmsDestinationName);
-    }
-
-    /**
-     * Return the service name using the JMS destination and its JNDI name
-     *
-     * @param dest the JMS Destination Queue or Topic
-     * @param jmsDestinationName the JMS destination name
-     * @return the name of the service using the destination
-     */
-    public String getServiceNameForDestination(Destination dest, String jmsDestinationName) {
-        String serviceName = serviceDestinationNameMapping.get(jmsDestinationName);
-
-        // hack to get around the crazy Active MQ dynamic queue and topic issues
-        if (serviceName == null) {
-            String provider = getJndiProperties().get(Context.INITIAL_CONTEXT_FACTORY);
-            if (provider.indexOf("activemq") != -1) {
-                serviceName = getServiceNameForJNDIName(
-                    (dest instanceof Queue ?
-                        JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE :
-                        JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC) + jmsDestinationName);
-            }
-        }
-        return serviceName;
-    }
 
     /**
      * Return the service name using the JMS destination given by the JNDI name
@@ -565,18 +536,10 @@
         return jndiProperties;
     }
 
-    public JMSMessageReceiver getJmsMessageReceiver() {
-        return jmsMessageReceiver;
-    }
-    
     public Context getContext() {
         return context;
     }
 
-    public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) {
-        this.jmsMessageReceiver = jmsMessageReceiver;
-    }
-
     private void handleException(String msg, Exception e) throws AxisJMSException {
         log.error(msg, e);
         throw new AxisJMSException(msg, e);

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java Sat Aug 23 13:43:57 2008
@@ -94,9 +94,6 @@
     public void start() throws AxisFault {
 
         for (JMSConnectionFactory conFac : connectionFactories.values()) {
-            conFac.setJmsMessageReceiver(
-                new JMSMessageReceiver(this, conFac, workerPool, cfgCtx));
-
             try {
                 conFac.connectAndListen();
             } catch (JMSException e) {
@@ -167,7 +164,7 @@
         log.info("Starting to listen on destination : " + destinationName + " of type "
                 + destinationType + " for service " + service.getName());
         cf.addDestination(destinationName, destinationType, service.getName());
-        cf.startListeningOnDestination(destinationName, destinationType);
+        cf.startListeningOnDestination(destinationName, destinationType, service.getName());
     }
 
     /**
@@ -229,7 +226,7 @@
             Parameter conFacParams = (Parameter) conFacIter.next();
 
             JMSConnectionFactory jmsConFactory =
-                new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
+                new JMSConnectionFactory(conFacParams.getName(), this, workerPool, cfgCtx);
             JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
 
             connectionFactories.put(jmsConFactory.getName(), jmsConFactory);

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java Sat Aug 23 13:43:57 2008
@@ -50,6 +50,8 @@
     private ConfigurationContext cfgCtx = null;
     /** A reference to the JMS Connection Factory to which this applies */
     private JMSConnectionFactory jmsConnectionFactory = null;
+    /** The name of the service this message receiver is bound to. */
+    final String serviceName;
     /** Metrics collector */
     private MetricsCollector metrics = null;
 
@@ -60,13 +62,15 @@
      * @param jmsConFac the JMS connection factory we are associated with
      * @param workerPool the worker thread pool to be used
      * @param cfgCtx the axis ConfigurationContext
+     * @param serviceName the name of the Axis service
      */
     JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac,
-                       WorkerPool workerPool, ConfigurationContext cfgCtx) {
+                       WorkerPool workerPool, ConfigurationContext cfgCtx, String serviceName) {
         this.jmsListener = jmsListener;
         this.jmsConnectionFactory = jmsConFac;
         this.workerPool = workerPool;
         this.cfgCtx = cfgCtx;
+        this.serviceName = serviceName;
         this.metrics = jmsListener.getMetricsCollector();
     }
 
@@ -156,16 +160,6 @@
             } catch (JMSException ignore) {}
 
             try {
-                Destination dest = message.getJMSDestination();
-                String destinationName = null;
-                if (dest instanceof Queue) {
-                    destinationName = ((Queue) dest).getQueueName();
-                } else if (dest instanceof Topic) {
-                    destinationName = ((Topic) dest).getTopicName();
-                }
-
-                String serviceName =
-                    jmsConnectionFactory.getServiceNameForDestination(dest, destinationName);
                 String soapAction = JMSUtils.getInstace().
                     getProperty(message, BaseConstants.SOAPACTION);
                 AxisService service = null;

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java Sat Aug 23 13:43:57 2008
@@ -556,7 +556,7 @@
             Parameter conFacParams = (Parameter) conFacIter.next();
 
             JMSConnectionFactory jmsConFactory =
-                new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
+                new JMSConnectionFactory(conFacParams.getName(), null, null, cfgCtx);
             JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
 
             try {