You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/09/12 08:16:29 UTC
svn commit: r574793 - in /webservices/synapse/trunk/java/modules:
core/src/main/java/org/apache/synapse/mediators/builtin/
transports/src/main/java/org/apache/axis2/transport/base/
transports/src/main/java/org/apache/axis2/transport/jms/
Author: asankha
Date: Tue Sep 11 23:16:25 2007
New Revision: 574793
URL: http://svn.apache.org/viewvc?rev=574793&view=rev
Log:
More JMS 1.0.2b support conversions from 1.1 and minor fixes
Also use a message selector on the JMSCorrelationID when waiting for a response on a destination
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java Tue Sep 11 23:16:25 2007
@@ -100,7 +100,7 @@
Axis2MessageContext axis2smc = (Axis2MessageContext) smc;
org.apache.axis2.context.MessageContext axis2MessageCtx =
axis2smc.getAxis2MessageContext();
- axis2MessageCtx.setProperty(name, resultValue);
+ axis2MessageCtx.getOptions().setProperty(name, resultValue);
} else if (Constants.SCOPE_TRANSPORT.equals(scope)
&& smc instanceof Axis2MessageContext) {
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java Tue Sep 11 23:16:25 2007
@@ -203,7 +203,6 @@
}
} catch (Exception e) {
- e.printStackTrace();
envelope = handleLegacyMessage(msgContext, message);
}
}
@@ -257,7 +256,7 @@
OMTextImpl textData = (OMTextImpl) soapFactory.createOMText(textPayload);
if (wrapperQName == null) {
- wrapperQName = BaseConstants.DEFAULT_BINARY_WRAPPER;
+ wrapperQName = BaseConstants.DEFAULT_TEXT_WRAPPER;
}
wrapper = soapFactory.createOMElement(wrapperQName, null);
wrapper.addChild(textData);
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Tue Sep 11 23:16:25 2007
@@ -133,7 +133,7 @@
try {
log.info("Creating a JMS Queue with the JNDI name : " + destinationJNDIName +
" using the connection factory definition named : " + name);
- JMSUtils.createJMSQueue(conFactory.createConnection(), destinationJNDIName);
+ JMSUtils.createDestination(conFactory, destinationJNDIName);
destinationName = getPhysicalDestinationName(destinationJNDIName);
@@ -298,7 +298,7 @@
}
try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE);
Destination destination = null;
try {
@@ -306,10 +306,10 @@
} catch (NameNotFoundException e) {
log.warn("Cannot find destination : " + destinationJNDIname + ". Creating a Queue");
- destination = session.createQueue(destinationJNDIname);
+ destination = JMSUtils.createDestination(session, destinationJNDIname);
}
- MessageConsumer consumer = session.createConsumer(destination);
+ MessageConsumer consumer = JMSUtils.createConsumer(session, destination);
consumer.setMessageListener(jmsMessageReceiver);
jmsSessions.put(destinationJNDIname, session);
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Tue Sep 11 23:16:25 2007
@@ -19,6 +19,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMText;
import org.apache.axiom.om.OMNode;
+import org.apache.axiom.om.util.UUIDGenerator;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ConfigurationContext;
@@ -34,6 +35,7 @@
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
+import javax.jms.Queue;
import javax.activation.DataHandler;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -169,9 +171,13 @@
destination = jmsOut.getDestination();
}
- String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY);
+ String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
if (replyDestName != null) {
- replyDestination = jmsOut.getReplyDestination(replyDestName);
+ if (jmsConnectionFactory != null) {
+ replyDestination = jmsConnectionFactory.getDestination(replyDestName);
+ } else {
+ replyDestination = jmsOut.getReplyDestination(replyDestName);
+ }
}
// now we are going to use the JMS session, but if this was a session from a
@@ -181,6 +187,7 @@
// convert the axis message context into a JMS Message that we can send over JMS
Message message = null;
+ String correlationId = null;
try {
message = createJMSMessage(msgCtx, session);
} catch (JMSException e) {
@@ -198,6 +205,14 @@
if (waitForResponse) {
replyDestination = JMSUtils.setReplyDestination(
replyDestination, session, message);
+ // force the use of a JMS correlation ID if synchronous
+ try {
+ correlationId = message.getJMSCorrelationID();
+ if (correlationId == null) {
+ correlationId = UUIDGenerator.getUUID();
+ message.setJMSCorrelationID(correlationId);
+ }
+ } catch (JMSException ignore) {}
}
// send the outgoing message over JMS to the destination selected
@@ -205,7 +220,7 @@
// if we are expecting a synchronous response back for the message sent out
if (waitForResponse) {
- waitForResponseAndProcess(session, replyDestination, msgCtx);
+ waitForResponseAndProcess(session, replyDestination, msgCtx, correlationId);
}
}
@@ -228,9 +243,25 @@
* @throws AxisFault on error
*/
private void waitForResponseAndProcess(Session session, Destination replyDestination,
- MessageContext msgCtx) throws AxisFault {
+ MessageContext msgCtx, String correlationId) throws AxisFault {
+
try {
- MessageConsumer consumer = session.createConsumer(replyDestination);
+ MessageConsumer consumer = null;
+ if (replyDestination instanceof Queue) {
+ if (correlationId != null) {
+ consumer = ((QueueSession) session).createReceiver((Queue) replyDestination,
+ "JMSCorrelationID = '" + correlationId + "'");
+ } else {
+ consumer = ((QueueSession) session).createReceiver((Queue) replyDestination);
+ }
+ } else {
+ if (correlationId != null) {
+ consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination,
+ correlationId, false);
+ } else {
+ consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination);
+ }
+ }
// how long are we willing to wait for the sync response
long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
@@ -241,7 +272,8 @@
if (log.isDebugEnabled()) {
log.debug("Waiting for a maximum of " + timeout +
- "ms for a response message to destination : " + replyDestination);
+ "ms for a response message to destination : " + replyDestination +
+ " with JMS correlation ID : " + correlationId);
}
Message reply = consumer.receive(timeout);
@@ -250,7 +282,8 @@
} else {
log.warn("Did not receive a JMS response within " +
- timeout + " ms to destination : " + replyDestination);
+ timeout + " ms to destination : " + replyDestination +
+ " with JMS correlation ID : " + correlationId);
}
} catch (JMSException e) {
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Tue Sep 11 23:16:25 2007
@@ -50,6 +50,7 @@
import javax.naming.Context;
import java.io.*;
import java.util.*;
+import java.nio.ByteBuffer;
/**
* Miscallaneous methods used for the JMS transport
@@ -75,9 +76,9 @@
*/
public static String createJMSQueue(Connection con, String destinationJNDIName) throws JMSException {
try {
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSession session = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(destinationJNDIName);
- log.info("JMS Destination with JNDI name : " + destinationJNDIName + " created");
+ log.info("JMS Queue with JNDI name : " + destinationJNDIName + " created");
return queue.getQueueName();
} finally {
@@ -88,6 +89,29 @@
}
/**
+ * Create a JMS Topic using the given connection with the JNDI destination name, and return the
+ * JMS Destination name of the created queue
+ *
+ * @param con the JMS Connection to be used
+ * @param destinationJNDIName the JNDI name of the Topic to be created
+ * @return the JMS Destination name of the created Topic
+ * @throws JMSException on error
+ */
+ public static String createJMSTopic(Connection con, String destinationJNDIName) throws JMSException {
+ try {
+ TopicSession session = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(destinationJNDIName);
+ log.info("JMS Topic with JNDI name : " + destinationJNDIName + " created");
+ return topic.getTopicName();
+
+ } finally {
+ try {
+ con.close();
+ } catch (JMSException ignore) {}
+ }
+ }
+
+ /**
* Should this service be enabled over the JMS transport?
*
* @param service the Axis service
@@ -301,7 +325,7 @@
if (replyDestination == null) {
try {
// create temporary queue to receive the reply
- replyDestination = session.createTemporaryQueue();
+ replyDestination = createTemporaryDestination(session);
} catch (JMSException e) {
handleException("Error creating temporary queue for response");
}
@@ -343,7 +367,7 @@
}
try {
- destination = session.createQueue(name);
+ destination = createDestination(session, name);
} catch (JMSException e) {
handleException("Error creating destination Queue : " + name, e);
}
@@ -604,18 +628,79 @@
}
public byte[] getMessageBinaryPayload(Object message) {
+
if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
- byte[] msgBytes;
+ ByteBuffer msgBytes = ByteBuffer.allocate(1024);
try {
- msgBytes = new byte[(int) bytesMessage.getBodyLength()];
- bytesMessage.reset();
- bytesMessage.readBytes(msgBytes);
+ while (true) {
+ byte[] temp = new byte[1024];
+ int read = bytesMessage.readBytes(temp);
+ if (read > 0) {
+ msgBytes.put(temp, 0, read);
+ } else {
+ msgBytes.flip();
+ return msgBytes.array();
+ }
+ }
} catch (JMSException e) {
handleException("Error reading JMS binary message payload", e);
}
}
return null;
+ }
+
+ // ----------- JMS 1.0.2b compatibility methods -------------
+ public static Session createSession(Connection con,
+ boolean transacted, int acknowledgeMode) throws JMSException {
+
+ if (con instanceof QueueConnection) {
+ return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
+ } else {
+ return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode);
+ }
+ }
+
+ public static Destination createDestination(Session session, String destName)
+ throws JMSException {
+
+ if (session instanceof QueueSession) {
+ return ((QueueSession) session).createQueue(destName);
+ } else {
+ return ((TopicSession) session).createTopic(destName);
+ }
+ }
+
+ public static void createDestination(ConnectionFactory conFactory,
+ String destinationJNDIName) throws JMSException {
+
+ if (conFactory instanceof QueueConnectionFactory) {
+ JMSUtils.createJMSQueue(
+ ((QueueConnectionFactory) conFactory).createQueueConnection(),
+ destinationJNDIName);
+ } else {
+ JMSUtils.createJMSTopic(
+ ((TopicConnectionFactory) conFactory).createTopicConnection(),
+ destinationJNDIName);
+ }
+ }
+ public static MessageConsumer createConsumer(Session session, Destination dest)
+ throws JMSException {
+
+ if (dest instanceof Queue) {
+ return ((QueueSession) session).createReceiver((Queue) dest);
+ } else {
+ return ((TopicSession) session).createSubscriber((Topic) dest);
+ }
+ }
+
+ public static Destination createTemporaryDestination(Session session) throws JMSException {
+
+ if (session instanceof QueueSession) {
+ return ((QueueSession) session).createTemporaryQueue();
+ } else {
+ return ((TopicSession) session).createTemporaryTopic();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org