You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/09/07 06:21:20 UTC
svn commit: r573445 - in
/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms:
JMSConnectionFactory.java JMSSender.java JMSUtils.java
Author: asankha
Date: Thu Sep 6 21:21:17 2007
New Revision: 573445
URL: http://svn.apache.org/viewvc?rev=573445&view=rev
Log:
updates to mainly support sending to JMS 1.x providers
Modified:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=573445&r1=573444&r2=573445&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Thu Sep 6 21:21:17 2007
@@ -182,7 +182,7 @@
* @throws JMSException on exceptions
* @throws NamingException on exceptions
*/
- public void connectAndListen() throws JMSException, NamingException {
+ public synchronized void connectAndListen() throws JMSException, NamingException {
// if this is a reconnection/re-initialization effort after the detection of a
// disconnection, close all sessions and the CF connection and re-initialize
@@ -211,7 +211,31 @@
log.info("Connected to the JMS connection factory : " + connFactoryJNDIName);
try {
- connection = conFactory.createConnection();
+ QueueConnectionFactory qConFac = null;
+ TopicConnectionFactory tConFac = null;
+ if (conFactory instanceof QueueConnectionFactory) {
+ qConFac = (QueueConnectionFactory) conFactory;
+ } else {
+ tConFac = (TopicConnectionFactory) conFactory;
+ }
+
+ String user = (String) jndiProperties.get(Context.SECURITY_PRINCIPAL);
+ String pass = (String) jndiProperties.get(Context.SECURITY_CREDENTIALS);
+
+ if (user != null && pass != null) {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection(user, pass);
+ } else {
+ connection = tConFac.createTopicConnection(user, pass);
+ }
+ } else {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection();
+ } else {
+ connection = tConFac.createTopicConnection();
+ }
+ }
+
} catch (JMSException e) {
handleException("Error connecting to Connection Factory : " + connFactoryJNDIName, e);
}
@@ -232,11 +256,24 @@
* @return a JMS Session to send messages to the destination using this connection factory
*/
public Session getSessionForDestination(String destinationJNDIname) {
+
Session session = (Session) jmsSessions.get(destinationJNDIname);
+
if (session == null) {
- try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jmsSessions.put(destinationJNDIname, session);
+ try {
+ Destination dest = (Destination) context.lookup(destinationJNDIname);
+ if (dest instanceof Topic) {
+ session = ((TopicConnection) connection).
+ createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ } else {
+ session = ((QueueConnection) connection).
+ createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ jmsSessions.put(destinationJNDIname, session);
+
+ } catch (NamingException e) {
+ handleException("Error looking up destination : " + destinationJNDIname, e);
} catch (JMSException e) {
handleException("Unable to create a session using connection factory : " + name, e);
}
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=573445&r1=573444&r2=573445&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Thu Sep 6 21:21:17 2007
@@ -35,6 +35,8 @@
import javax.jms.*;
import javax.activation.DataHandler;
+import javax.naming.Context;
+import javax.naming.NamingException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.*;
@@ -117,8 +119,39 @@
jmsOut.loadConnectionFactoryFromProperies();
try {
// create a one time connection and session to be used
- connection = jmsOut.getConnectionFactory().createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Hashtable jndiProps = jmsOut.getProperties();
+ String user = (String) jndiProps.get(Context.SECURITY_PRINCIPAL);
+ String pass = (String) jndiProps.get(Context.SECURITY_CREDENTIALS);
+
+ QueueConnectionFactory qConFac = null;
+ TopicConnectionFactory tConFac = null;
+ if (jmsOut.getConnectionFactory() instanceof QueueConnectionFactory) {
+ qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
+ } else {
+ tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
+ }
+
+ if (user != null && pass != null) {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection(user, pass);
+ } else {
+ connection = tConFac.createTopicConnection(user, pass);
+ }
+ } else {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection();
+ } else {
+ connection = tConFac.createTopicConnection();
+ }
+ }
+
+ if (qConFac != null) {
+ session = ((QueueConnection)connection).
+ createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ } else {
+ session = ((TopicConnection)connection).
+ createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
} catch (JMSException e) {
handleException("Error creating a connection/session for : " + targetAddress);
@@ -416,6 +449,14 @@
JMSConnectionFactory jmsConFactory =
new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
+
+ try {
+ jmsConFactory.connectAndListen();
+ } catch (NamingException e) {
+ log.warn("Error looking up JMS connection factory : " + jmsConFactory.getName(), e);
+ } catch (JMSException e) {
+ log.warn("Error connecting to JMS connection factory : " + jmsConFactory.getName(), e);
+ }
connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
}
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=573445&r1=573444&r2=573445&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Thu Sep 6 21:21:17 2007
@@ -238,6 +238,8 @@
Context.SECURITY_CREDENTIALS, (String) p.getValue());
} else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) {
jmsConFactory.setConnFactoryJNDIName((String) p.getValue());
+ jmsConFactory.addJNDIContextProperty(
+ JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue());
}
}
}
@@ -361,13 +363,20 @@
*/
public static void sendMessageToJMSDestination(Session session,
Destination destination, Message message) throws AxisFault {
+
MessageProducer producer = null;
try {
- producer = session.createProducer(destination);
if (log.isDebugEnabled()) {
log.debug("Sending message to destination : " + destination);
}
- producer.send(message);
+
+ if (destination instanceof Queue) {
+ producer = ((QueueSession) session).createSender((Queue) destination);
+ ((QueueSender) producer).send(message);
+ } else {
+ producer = ((TopicSession) session).createPublisher((Topic) destination);
+ ((TopicPublisher) producer).publish(message);
+ }
} catch (JMSException e) {
handleException("Error creating a producer or sending to : " + destination, e);
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org