You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/09/07 06:21:20 UTC

svn commit: r573445 - in /webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms: JMSConnectionFactory.java JMSSender.java JMSUtils.java

Author: asankha
Date: Thu Sep  6 21:21:17 2007
New Revision: 573445

URL: http://svn.apache.org/viewvc?rev=573445&view=rev
Log:
updates to mainly support sending to JMS 1.x providers

Modified:
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=573445&r1=573444&r2=573445&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Thu Sep  6 21:21:17 2007
@@ -182,7 +182,7 @@
      * @throws JMSException on exceptions
      * @throws NamingException on exceptions
      */
-    public void connectAndListen() throws JMSException, NamingException {
+    public synchronized void connectAndListen() throws JMSException, NamingException {
 
         // if this is a reconnection/re-initialization effort after the detection of a
         // disconnection, close all sessions and the CF connection and re-initialize
@@ -211,7 +211,31 @@
         log.info("Connected to the JMS connection factory : " + connFactoryJNDIName);
 
         try {
-            connection = conFactory.createConnection();
+            QueueConnectionFactory qConFac = null;
+            TopicConnectionFactory tConFac = null;
+            if (conFactory instanceof QueueConnectionFactory) {
+                qConFac = (QueueConnectionFactory) conFactory;
+            } else {
+                tConFac = (TopicConnectionFactory) conFactory;
+            }
+
+            String user = (String) jndiProperties.get(Context.SECURITY_PRINCIPAL);
+            String pass = (String) jndiProperties.get(Context.SECURITY_CREDENTIALS);
+
+            if (user != null && pass != null) {
+                if (qConFac != null) {
+                    connection = qConFac.createQueueConnection(user, pass);
+                } else {
+                    connection = tConFac.createTopicConnection(user, pass);
+                }
+            } else {
+                if (qConFac != null) {
+                    connection = qConFac.createQueueConnection();
+                } else {
+                    connection = tConFac.createTopicConnection();
+                }
+            }
+
         } catch (JMSException e) {
             handleException("Error connecting to Connection Factory : " + connFactoryJNDIName, e);
         }
@@ -232,11 +256,24 @@
      * @return a JMS Session to send messages to the destination using this connection factory
      */
     public Session getSessionForDestination(String destinationJNDIname) {
+
         Session session = (Session) jmsSessions.get(destinationJNDIname);
+
         if (session == null) {
-            try {
-                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                jmsSessions.put(destinationJNDIname,  session);
+            try {                
+                Destination dest = (Destination) context.lookup(destinationJNDIname);
+                if (dest instanceof Topic) {
+                    session = ((TopicConnection) connection).
+                        createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                } else {
+                    session = ((QueueConnection) connection).
+                        createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+                }
+
+                jmsSessions.put(destinationJNDIname, session);
+
+            } catch (NamingException e) {
+                handleException("Error looking up destination : " + destinationJNDIname, e);
             } catch (JMSException e) {
                 handleException("Unable to create a session using connection factory : " + name, e);
             }

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=573445&r1=573444&r2=573445&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Thu Sep  6 21:21:17 2007
@@ -35,6 +35,8 @@
 
 import javax.jms.*;
 import javax.activation.DataHandler;
+import javax.naming.Context;
+import javax.naming.NamingException;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.*;
@@ -117,8 +119,39 @@
                     jmsOut.loadConnectionFactoryFromProperies();
                     try {
                         // create a one time connection and session to be used
-                        connection = jmsOut.getConnectionFactory().createConnection();
-                        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        Hashtable jndiProps = jmsOut.getProperties();
+                        String user = (String) jndiProps.get(Context.SECURITY_PRINCIPAL);
+                        String pass = (String) jndiProps.get(Context.SECURITY_CREDENTIALS);
+
+                        QueueConnectionFactory qConFac = null;
+                        TopicConnectionFactory tConFac = null;
+                        if (jmsOut.getConnectionFactory() instanceof QueueConnectionFactory) {
+                            qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
+                        } else {
+                            tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
+                        }
+
+                        if (user != null && pass != null) {
+                            if (qConFac != null) {
+                                connection = qConFac.createQueueConnection(user, pass);
+                            } else {
+                                connection = tConFac.createTopicConnection(user, pass);
+                            }
+                        } else {
+                           if (qConFac != null) {
+                                connection = qConFac.createQueueConnection();
+                            } else {
+                                connection = tConFac.createTopicConnection();
+                            }
+                        }
+
+                        if (qConFac != null) {
+                            session = ((QueueConnection)connection).
+                                createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+                        } else {
+                            session = ((TopicConnection)connection).
+                                createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                        }
 
                     } catch (JMSException e) {
                         handleException("Error creating a connection/session for : " + targetAddress);
@@ -416,6 +449,14 @@
             JMSConnectionFactory jmsConFactory =
                 new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
             JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
+
+            try {
+                jmsConFactory.connectAndListen();
+            } catch (NamingException e) {
+                log.warn("Error looking up JMS connection factory : " + jmsConFactory.getName(), e);
+            } catch (JMSException e) {
+                log.warn("Error connecting to JMS connection factory : " + jmsConFactory.getName(), e);
+            }
 
             connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
         }

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=573445&r1=573444&r2=573445&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Thu Sep  6 21:21:17 2007
@@ -238,6 +238,8 @@
                     Context.SECURITY_CREDENTIALS, (String) p.getValue());
             } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) {
                 jmsConFactory.setConnFactoryJNDIName((String) p.getValue());
+                jmsConFactory.addJNDIContextProperty(
+                    JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue());
             }
         }
     }
@@ -361,13 +363,20 @@
      */
     public static void sendMessageToJMSDestination(Session session,
         Destination destination, Message message) throws AxisFault {
+
         MessageProducer producer = null;
         try {
-            producer = session.createProducer(destination);
             if (log.isDebugEnabled()) {
                 log.debug("Sending message to destination : " + destination);
             }
-            producer.send(message);
+
+            if (destination instanceof Queue) {
+                producer = ((QueueSession) session).createSender((Queue) destination);
+                ((QueueSender) producer).send(message);
+            } else {
+                producer = ((TopicSession) session).createPublisher((Topic) destination);
+                ((TopicPublisher) producer).publish(message);
+            }
 
         } catch (JMSException e) {
             handleException("Error creating a producer or sending to : " + destination, e);



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org