You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by se...@apache.org on 2009/01/03 07:33:41 UTC
svn commit: r730924 [3/9] - in
/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules:
base/src/main/java/org/apache/axis2/format/
base/src/main/java/org/apache/axis2/transport/base/
base/src/main/java/org/apache/axis2/transport/base/data...
Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java?rev=730924&r1=730923&r2=730924&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (original)
+++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java Fri Jan 2 22:33:39 2009
@@ -15,81 +15,85 @@
*/
package org.apache.axis2.transport.jms;
+import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
-import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.axis2.transport.base.BaseUtils;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.MetricsCollector;
+import org.apache.axis2.transport.jms.ctype.ContentTypeInfo;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.xml.namespace.QName;
+import javax.transaction.UserTransaction;
/**
- * This is the actual receiver which listens for and accepts JMS messages, and
- * hands them over to be processed by a worker thread. An instance of this
- * class is created for each JMSConnectionFactory, but all instances may and
- * will share the same worker thread pool held by the JMSListener
+ * This is the JMS message receiver which is invoked when a message is received. This processes
+ * the message through the engine
*/
-public class JMSMessageReceiver implements MessageListener {
+public class JMSMessageReceiver {
private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
/** The JMSListener */
private JMSListener jmsListener = null;
- /** The thread pool of workers */
- private WorkerPool workerPool = null;
- /** The Axis configuration context */
- private ConfigurationContext cfgCtx = null;
- /** A reference to the JMS Connection Factory to which this applies */
+ /** A reference to the JMS Connection Factory */
private JMSConnectionFactory jmsConnectionFactory = null;
- /** The name of the service this message receiver is bound to. */
- final String serviceName;
- /** Metrics collector */
+ /** The JMS metrics collector */
private MetricsCollector metrics = null;
+ /** The endpoint this message receiver is bound to */
+ final JMSEndpoint endpoint;
/**
* Create a new JMSMessage receiver
*
* @param jmsListener the JMS transport Listener
- * @param jmsConFac the JMS connection factory we are associated with
- * @param workerPool the worker thread pool to be used
- * @param cfgCtx the axis ConfigurationContext
+ * @param jmsConFac the JMS connection factory we are associated with
+ * @param workerPool the worker thread pool to be used
+ * @param cfgCtx the axis ConfigurationContext
* @param serviceName the name of the Axis service
+ * @param endpoint the JMSEndpoint definition to be used
*/
- JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac,
- WorkerPool workerPool, ConfigurationContext cfgCtx, String serviceName) {
+ JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
this.jmsListener = jmsListener;
this.jmsConnectionFactory = jmsConFac;
- this.workerPool = workerPool;
- this.cfgCtx = cfgCtx;
- this.serviceName = serviceName;
+ this.endpoint = endpoint;
this.metrics = jmsListener.getMetricsCollector();
}
/**
- * The entry point on the reception of each JMS message
+ * Process a new message received
*
* @param message the JMS message received
+ * @param ut UserTransaction which was used to receive the message
+ * @return true if caller should commit
*/
- public void onMessage(Message message) {
- // directly create a new worker and delegate processing
+ public boolean onMessage(Message message, UserTransaction ut) {
+
try {
if (log.isDebugEnabled()) {
StringBuffer sb = new StringBuffer();
- sb.append("Received JMS message to destination : " + message.getJMSDestination());
- sb.append("\nMessage ID : " + message.getJMSMessageID());
- sb.append("\nCorrelation ID : " + message.getJMSCorrelationID());
- sb.append("\nReplyTo ID : " + message.getJMSReplyTo());
+ sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
+ sb.append("\nDestination : ").append(message.getJMSDestination());
+ sb.append("\nMessage ID : ").append(message.getJMSMessageID());
+ sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
+ sb.append("\nReplyTo : ").append(message.getJMSReplyTo());
+ sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered());
+ sb.append("\nPriority : ").append(message.getJMSPriority());
+ sb.append("\nExpiration : ").append(message.getJMSExpiration());
+ sb.append("\nTimestamp : ").append(message.getJMSTimestamp());
+ sb.append("\nMessage Type : ").append(message.getJMSType());
+ sb.append("\nPersistent ? : ").append(
+ DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
+
log.debug(sb.toString());
if (log.isTraceEnabled() && message instanceof TextMessage) {
- log.trace("\nMessage : " + ((TextMessage) message).getText());
+ log.trace("\nMessage : " + ((TextMessage) message).getText());
}
}
} catch (JMSException e) {
@@ -100,134 +104,130 @@
// update transport level metrics
try {
- if (message instanceof BytesMessage) {
- metrics.incrementBytesReceived((JMSUtils.getBodyLength((BytesMessage) message)));
- } else if (message instanceof MapMessage) {
- metrics.incrementBytesReceived((JMSUtils.getBodyLength((MapMessage) message)));
- } else if (message instanceof TextMessage) {
- metrics.incrementBytesReceived(((TextMessage) message).getText().getBytes().length);
- } else {
- handleException("Unsupported JMS message type : " + message.getClass().getName());
- }
+ metrics.incrementBytesReceived(JMSUtils.getMessageSize(message));
} catch (JMSException e) {
log.warn("Error reading JMS message size to update transport metrics", e);
}
// has this message already expired? expiration time == 0 means never expires
try {
- long expiryTime = message.getJMSExpiration();
+ long expiryTime = message.getJMSExpiration();
if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
if (log.isDebugEnabled()) {
log.debug("Discard expired message with ID : " + message.getJMSMessageID());
}
- return;
+ return true;
}
} catch (JMSException ignore) {}
- workerPool.execute(new Worker(message));
- }
- private void handleException(String msg, Exception e) {
- log.error(msg, e);
- throw new AxisJMSException(msg, e);
- }
+ boolean successful = false;
+ try {
+ successful = processThoughEngine(message, ut);
- private void handleException(String msg) {
- log.error(msg);
- throw new AxisJMSException(msg);
- }
+ } catch (JMSException e) {
+ log.error("JMS Exception encountered while processing", e);
+ } catch (AxisFault e) {
+ log.error("Axis fault processing message", e);
+ } catch (Exception e) {
+ log.error("Unknown error processing message", e);
+ } finally {
+ if (successful) {
+ metrics.incrementMessagesReceived();
+ } else {
+ metrics.incrementFaultsReceiving();
+ }
+ }
+
+ return successful;
+ }
/**
- * The actual Worker implementation which will process the
- * received JMS messages in the worker thread pool
+ * Process the new message through Axis2
+ *
+ * @param message the JMS message
+ * @param ut the UserTransaction used for receipt
+ * @return true if the caller should commit
+ * @throws JMSException, on JMS exceptions
+ * @throws AxisFault on Axis2 errors
*/
- class Worker implements Runnable {
+ private boolean processThoughEngine(Message message, UserTransaction ut)
+ throws JMSException, AxisFault {
- private Message message = null;
+ MessageContext msgContext = jmsListener.createMessageContext();
- Worker(Message message) {
- this.message = message;
+ // set the JMS Message ID as the Message ID of the MessageContext
+ try {
+ msgContext.setMessageID(message.getJMSMessageID());
+ msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
+ } catch (JMSException ignore) {}
+
+ String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
+
+ AxisService service = endpoint.getService();
+ msgContext.setAxisService(service);
+
+ // find the operation for the message, or default to one
+ Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
+ QName operationQName = (
+ operationParam != null ?
+ BaseUtils.getQNameFromString(operationParam.getValue()) :
+ BaseConstants.DEFAULT_OPERATION);
+
+ AxisOperation operation = service.getOperation(operationQName);
+ if (operation != null) {
+ msgContext.setAxisOperation(operation);
+ msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
}
- public void run() {
-
- MessageContext msgContext = jmsListener.createMessageContext();
-
- // set the JMS Message ID as the Message ID of the MessageContext
- try {
- msgContext.setMessageID(message.getJMSMessageID());
- msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
- } catch (JMSException ignore) {}
-
- AxisService service = null;
- try {
- String soapAction = JMSUtils.
- getProperty(message, BaseConstants.SOAPACTION);
-
- // set to bypass dispatching if we know the service - we already should!
- if (serviceName != null) {
- service = cfgCtx.getAxisConfiguration().getService(serviceName);
- msgContext.setAxisService(service);
-
- // find the operation for the message, or default to one
- Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
- QName operationQName = (
- operationParam != null ?
- BaseUtils.getQNameFromString(operationParam.getValue()) :
- BaseConstants.DEFAULT_OPERATION);
-
- AxisOperation operation = service.getOperation(operationQName);
- if (operation != null) {
- msgContext.setAxisOperation(operation);
- msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
- }
- }
+ ContentTypeInfo contentTypeInfo =
+ endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
+ if (contentTypeInfo == null) {
+ throw new AxisFault("Unable to determine content type for message " +
+ msgContext.getMessageID());
+ }
- // set the message property OUT_TRANSPORT_INFO
- // the reply is assumed to be over the JMSReplyTo destination, using
- // the same incoming connection factory, if a JMSReplyTo is available
- if (message.getJMSReplyTo() != null) {
- msgContext.setProperty(
- Constants.OUT_TRANSPORT_INFO,
- new JMSOutTransportInfo(jmsConnectionFactory, message.getJMSReplyTo()));
-
- } else if (service != null) {
- // does the service specify a default reply destination ?
- Parameter param = service.getParameter(JMSConstants.REPLY_PARAM);
- if (param != null && param.getValue() != null) {
- msgContext.setProperty(
- Constants.OUT_TRANSPORT_INFO,
- new JMSOutTransportInfo(
- jmsConnectionFactory,
- jmsConnectionFactory.getDestination((String) param.getValue())));
- }
- }
+ // set the message property OUT_TRANSPORT_INFO
+ // the reply is assumed to be over the JMSReplyTo destination, using
+ // the same incoming connection factory, if a JMSReplyTo is available
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo == null) {
+ // does the service specify a default reply destination ?
+ Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
+ if (param != null && param.getValue() != null) {
+ replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
+ }
- String contentType = null;
- if (service != null) {
- contentType = (String)service.getParameterValue(JMSConstants.CONTENT_TYPE_PARAM);
- }
- if (contentType == null) {
- contentType
- = JMSUtils.getProperty(message, BaseConstants.CONTENT_TYPE);
- }
-
- JMSUtils.setSOAPEnvelope(message, msgContext, contentType);
+ }
+ if (replyTo != null) {
+ msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+ new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
+ contentTypeInfo.getPropertyName()));
+ }
- jmsListener.handleIncomingMessage(
- msgContext,
- JMSUtils.getTransportHeaders(message),
- soapAction,
- contentType
- );
- metrics.incrementMessagesReceived();
+ JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
+ if (ut != null) {
+ msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
+ }
- } catch (Throwable e) {
- metrics.incrementFaultsReceiving();
- jmsListener.error(service, e);
- log.error("Exception while processing incoming message", e);
+ try {
+ jmsListener.handleIncomingMessage(
+ msgContext,
+ JMSUtils.getTransportHeaders(message),
+ soapAction,
+ contentTypeInfo.getContentType());
+
+ } finally {
+
+ Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
+ if (o != null) {
+ if ((o instanceof Boolean && ((Boolean) o)) ||
+ (o instanceof String && Boolean.valueOf((String) o))) {
+ return false;
+ }
}
+ return true;
}
}
}
Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=730924&r1=730923&r2=730924&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (original)
+++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Fri Jan 2 22:33:39 2009
@@ -16,6 +16,7 @@
package org.apache.axis2.transport.jms;
import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.base.BaseUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,8 +41,8 @@
/** The naming context */
private Context context;
/**
- * this is a reference to the underlying JMS connection factory when sending messages
- * through connection factories not defined to the transport sender
+ * this is a reference to the underlying JMS ConnectionFactory when sending messages
+ * through connection factories not defined at the TransportSender level
*/
private ConnectionFactory connectionFactory = null;
/**
@@ -52,31 +53,39 @@
private JMSConnectionFactory jmsConnectionFactory = null;
/** the Destination queue or topic for the outgoing message */
private Destination destination = null;
- /** the Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
- private String destinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+ /** the Destination queue or topic for the outgoing message
+ * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+ */
+ private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
/** the Reply Destination queue or topic for the outgoing message */
private Destination replyDestination = null;
/** the Reply Destination name */
private String replyDestinationName = null;
- /** the Reply Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
- private String replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+ /** the Reply Destination queue or topic for the outgoing message
+ * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+ */
+ private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
/** the EPR properties when the out-transport info is generated from a target EPR */
private Hashtable<String,String> properties = null;
/** the target EPR string where applicable */
private String targetEPR = null;
- private String contentType = null;
-
+ /** the message property name that stores the content type of the outgoing message */
+ private String contentTypeProperty;
+
/**
* Creates an instance using the given JMS connection factory and destination
*
* @param jmsConnectionFactory the JMS connection factory
* @param dest the destination
+ * @param contentTypeProperty
*/
- JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest) {
+ JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest,
+ String contentTypeProperty) {
this.jmsConnectionFactory = jmsConnectionFactory;
this.destination = dest;
destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC
: JMSConstants.DESTINATION_TYPE_QUEUE;
+ this.contentTypeProperty = contentTypeProperty;
}
/**
@@ -85,28 +94,31 @@
* @param targetEPR the target EPR
*/
JMSOutTransportInfo(String targetEPR) {
+
this.targetEPR = targetEPR;
if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) {
handleException("Invalid prefix for a JMS EPR : " + targetEPR);
+
} else {
- properties = JMSUtils.getProperties(targetEPR);
- String destinationType = properties.get(JMSConstants.DEST_PARAM_TYPE);
- if(destinationType != null) {
+ properties = BaseUtils.getEPRProperties(targetEPR);
+ String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE);
+ if (destinationType != null) {
setDestinationType(destinationType);
}
- String replyDestinationType = properties.get(JMSConstants.REPLY_PARAM_TYPE);
- if(replyDestinationType != null) {
+
+ String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE);
+ if (replyDestinationType != null) {
setReplyDestinationType(replyDestinationType);
}
- String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
- if(replyDestinationName != null) {
- setReplyDestinationName(replyDestinationName);
- }
+
+ replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
+ contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
try {
context = new InitialContext(properties);
} catch (NamingException e) {
handleException("Could not get an initial context using " + properties, e);
}
+
destination = getDestination(context, targetEPR);
replyDestination = getReplyDestination(context, targetEPR);
}
@@ -132,7 +144,7 @@
private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) {
try {
- String conFacJndiName = props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM);
+ String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME);
if (conFacJndiName != null) {
return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName);
} else {
@@ -156,8 +168,12 @@
try {
return JMSUtils.lookup(context, Destination.class, destinationName);
} catch (NameNotFoundException e) {
- if (log.isDebugEnabled()) {
- log.debug("Cannot locate destination : " + destinationName + " using " + url);
+ try {
+ return JMSUtils.lookup(context, Destination.class,
+ (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ?
+ "dynamicTopics/" : "dynamicQueues/") + destinationName);
+ } catch (NamingException x) {
+ handleException("Cannot locate destination : " + destinationName + " using " + url);
}
} catch (NamingException e) {
handleException("Cannot locate destination : " + destinationName + " using " + url, e);
@@ -173,10 +189,11 @@
* @return the JMS destination, or null if it does not exist
*/
private Destination getReplyDestination(Context context, String url) {
- String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
+ String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
if(replyDestinationName == null) {
return null;
}
+
try {
return JMSUtils.lookup(context, Destination.class, replyDestinationName);
} catch (NameNotFoundException e) {
@@ -186,13 +203,14 @@
} catch (NamingException e) {
handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e);
}
+
return null;
}
/**
* Look up for the given destination
- * @param replyDest
- * @return
+ * @param replyDest the JNDI name to lookup Destination required
+ * @return Destination for the JNDI name passed
*/
public Destination getReplyDestination(String replyDest) {
try {
@@ -232,7 +250,7 @@
}
public void setContentType(String contentType) {
- this.contentType = contentType;
+ // this is a useless Axis2 method imposed by the OutTransportInfo interface :(
}
public Hashtable<String,String> getProperties() {
@@ -276,4 +294,12 @@
public void setReplyDestinationName(String replyDestinationName) {
this.replyDestinationName = replyDestinationName;
}
+
+ public String getContentTypeProperty() {
+ return contentTypeProperty;
+ }
+
+ public void setContentTypeProperty(String contentTypeProperty) {
+ this.contentTypeProperty = contentTypeProperty;
+ }
}
Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=730924&r1=730923&r2=730924&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Fri Jan 2 22:33:39 2009
@@ -23,6 +23,7 @@
import org.apache.axiom.om.OMNode;
import org.apache.axiom.om.ds.MapDataSource;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.TransportOutDescription;
@@ -30,20 +31,19 @@
import org.apache.axis2.transport.MessageFormatter;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.base.*;
+import org.apache.axis2.transport.base.streams.WriterOutputStream;
import org.apache.axis2.transport.http.HTTPConstants;
-import org.apache.commons.logging.LogFactory;
import javax.jms.*;
-import javax.jms.Queue;
import javax.activation.DataHandler;
-import javax.naming.Context;
import javax.xml.stream.XMLStreamException;
-
import java.beans.XMLDecoder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.nio.charset.UnsupportedCharsetException;
import java.util.*;
/**
@@ -51,34 +51,23 @@
*/
public class JMSSender extends AbstractTransportSender implements ManagementSupport {
- public static final String TRANSPORT_NAME = "jms";
-
- private JMSConnectionFactoryManager connFacManager;
+ public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
- public JMSSender() {
- log = LogFactory.getLog(JMSSender.class);
- }
+ /** The JMS connection factory manager to be used when sending messages out */
+ private JMSConnectionFactoryManager connFacManager;
/**
* Initialize the transport sender by reading pre-defined connection factories for
- * outgoing messages. These will create sessions (one per each destination dealth with)
- * to be used when messages are being sent.
+ * outgoing messages.
+ *
* @param cfgCtx the configuration context
* @param transportOut the transport sender definition from axis2.xml
* @throws AxisFault on error
*/
public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
super.init(cfgCtx, transportOut);
- connFacManager = new JMSConnectionFactoryManager(cfgCtx);
- // read the connection factory definitions and create them
- connFacManager.loadConnectionFactoryDefinitions(transportOut);
- connFacManager.start();
- }
-
- @Override
- public void stop() {
- connFacManager.stop();
- super.stop();
+ connFacManager = new JMSConnectionFactoryManager(transportOut);
+ log.info("JMS Transport Sender initialized...");
}
/**
@@ -90,9 +79,9 @@
*/
private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
Map<String,String> props = trpInfo.getProperties();
- if(trpInfo.getProperties() != null) {
- String jmsConnectionFactoryName = props.get(JMSConstants.CONFAC_PARAM);
- if(jmsConnectionFactoryName != null) {
+ if (trpInfo.getProperties() != null) {
+ String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
+ if (jmsConnectionFactoryName != null) {
return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
} else {
return connFacManager.getJMSConnectionFactory(props);
@@ -109,84 +98,85 @@
OutTransportInfo outTransportInfo) throws AxisFault {
JMSConnectionFactory jmsConnectionFactory = null;
- Connection connection = null; // holds a one time connection if used
JMSOutTransportInfo jmsOut = null;
- Session session = null;
- Destination replyDestination = null;
+ JMSMessageSender messageSender = null;
- try {
- if (targetAddress != null) {
+ if (targetAddress != null) {
- jmsOut = new JMSOutTransportInfo(targetAddress);
- // do we have a definition for a connection factory to use for this address?
- jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+ jmsOut = new JMSOutTransportInfo(targetAddress);
+ // do we have a definition for a connection factory to use for this address?
+ jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+
+ if (jmsConnectionFactory != null) {
+ messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
- if (jmsConnectionFactory != null) {
- // create new or get existing session to send to the destination from the CF
- session = jmsConnectionFactory.getSessionForDestination(
- JMSUtils.getDestination(targetAddress));
+ } else {
+ try {
+ messageSender = JMSUtils.createJMSSender(jmsOut);
+ } catch (JMSException e) {
+ handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+ }
+ }
- } else {
- // digest the targetAddress and locate CF from the EPR
- jmsOut.loadConnectionFactoryFromProperies();
- try {
- // create a one time connection and session to be used
- Hashtable<String,String> jndiProps = jmsOut.getProperties();
- String user = jndiProps.get(Context.SECURITY_PRINCIPAL);
- String pass = jndiProps.get(Context.SECURITY_CREDENTIALS);
-
- QueueConnectionFactory qConFac = null;
- TopicConnectionFactory tConFac = null;
-
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
- qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
- } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
- tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
- } else {
- handleException("Unable to determine type of JMS " +
- "Connection Factory - i.e Queue/Topic");
- }
+ } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
- if (user != null && pass != null) {
- if (qConFac != null) {
- connection = qConFac.createQueueConnection(user, pass);
- } else if (tConFac != null) {
- connection = tConFac.createTopicConnection(user, pass);
- }
- } else {
- if (qConFac != null) {
- connection = qConFac.createQueueConnection();
- } else if (tConFac != null) {
- connection = tConFac.createTopicConnection();
- }
- }
+ jmsOut = (JMSOutTransportInfo) outTransportInfo;
+ try {
+ messageSender = JMSUtils.createJMSSender(jmsOut);
+ } catch (JMSException e) {
+ handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+ }
+ }
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
- session = ((QueueConnection)connection).
- createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
- session = ((TopicConnection)connection).
- createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- }
+ // The message property to be used to send the content type is determined by
+ // the out transport info, i.e. either from the EPR if we are sending a request,
+ // or, if we are sending a response, from the configuration of the service that
+ // received the request). The property name can be overridden by a message
+ // context property.
+ String contentTypeProperty =
+ (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+ if (contentTypeProperty == null) {
+ contentTypeProperty = jmsOut.getContentTypeProperty();
+ }
- } catch (JMSException e) {
- handleException("Error creating a connection/session for : " + targetAddress, e);
- }
- }
- replyDestination = jmsOut.getReplyDestination();
+ // need to synchronize as Sessions are not thread safe
+ synchronized (messageSender.getSession()) {
+ try {
+ sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
+ } finally {
+ messageSender.close();
+ }
+ }
+ }
- } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
+ /**
+ * Perform actual sending of the JMS message
+ */
+ private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
+ String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
+ JMSOutTransportInfo jmsOut) throws AxisFault {
+
+ // convert the axis message context into a JMS Message that we can send over JMS
+ Message message = null;
+ String correlationId = null;
+ try {
+ message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
+ } catch (JMSException e) {
+ handleException("Error creating a JMS message from the message context", e);
+ }
- jmsOut = (JMSOutTransportInfo) outTransportInfo;
- jmsConnectionFactory = jmsOut.getJmsConnectionFactory();
+ // should we wait for a synchronous response on this same thread?
+ boolean waitForResponse = waitForSynchronousResponse(msgCtx);
+ Destination replyDestination = jmsOut.getReplyDestination();
- session = jmsConnectionFactory.getSessionForDestination(
- jmsOut.getDestination().toString());
- }
-
- Destination destination = jmsOut.getDestination();
+ // if this is a synchronous out-in, prepare to listen on the response destination
+ if (waitForResponse) {
String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
+ if (replyDestName == null && jmsConnectionFactory != null) {
+ replyDestName = jmsConnectionFactory.getReplyToDestination();
+ }
+
if (replyDestName != null) {
if (jmsConnectionFactory != null) {
replyDestination = jmsConnectionFactory.getDestination(replyDestName);
@@ -194,103 +184,45 @@
replyDestination = jmsOut.getReplyDestination(replyDestName);
}
}
+ replyDestination = JMSUtils.setReplyDestination(
+ replyDestination, messageSender.getSession(), message);
+ }
- if(session == null) {
- handleException("Could not create JMS session");
- }
-
- // now we are going to use the JMS session, but if this was a session from a
- // defined JMS connection factory, we need to synchronize as sessions are not
- // thread safe
- synchronized(session) {
-
- // convert the axis message context into a JMS Message that we can send over JMS
- Message message = null;
- String correlationId = null;
- try {
- message = createJMSMessage(msgCtx, session);
- } catch (JMSException e) {
- handleException("Error creating a JMS message from the axis message context", e);
- }
-
- String destinationType = jmsOut.getDestinationType();
-
- // if the destination does not exist, see if we can create it
- destination = JMSUtils.createDestinationIfRequired(
- destination, destinationType, targetAddress, session);
-
- if(jmsOut.getReplyDestinationName() != null) {
- replyDestination = JMSUtils.createReplyDestinationIfRequired(
- replyDestination, jmsOut.getReplyDestinationName(),
- jmsOut.getReplyDestinationType(), targetAddress, session);
- }
-
- // should we wait for a synchronous response on this same thread?
- boolean waitForResponse = waitForSynchronousResponse(msgCtx);
-
- // if this is a synchronous out-in, prepare to listen on the response destination
- if (waitForResponse) {
- replyDestination = JMSUtils.setReplyDestination(
- replyDestination, session, message);
- }
+ try {
+ messageSender.send(message, msgCtx);
+ metrics.incrementMessagesSent(msgCtx);
- // send the outgoing message over JMS to the destination selected
- try {
- JMSUtils.sendMessageToJMSDestination(session, destination, destinationType, message);
+ } catch (AxisJMSException e) {
+ metrics.incrementFaultsSending();
+ handleException("Error sending JMS message", e);
+ }
- // set the actual MessageID to the message context for use by any others
- try {
- String msgId = message.getJMSMessageID();
- if (msgId != null) {
- msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
- }
- } catch (JMSException ignore) {}
+ try {
+ metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message));
+ } catch (JMSException e) {
+ log.warn("Error reading JMS message size to update transport metrics", e);
+ }
- metrics.incrementMessagesSent();
- try {
- if (message instanceof BytesMessage) {
- metrics.incrementBytesSent(JMSUtils.getBodyLength((BytesMessage) message));
- } else if (message instanceof MapMessage) {
- metrics.incrementBytesSent((JMSUtils.getBodyLength((MapMessage) message)));
- } else if (message instanceof TextMessage) {
- metrics.incrementBytesSent((
- (TextMessage) message).getText().getBytes().length);
- } else {
- handleException("Unsupported JMS message type : " +
- message.getClass().getName());
- }
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
- } catch (BaseTransportException e) {
- metrics.incrementFaultsSending();
- throw e;
- }
+ // if we are expecting a synchronous response back for the message sent out
+ if (waitForResponse) {
+ // TODO ********************************************************************************
+ // TODO **** replace with asynchronous polling via a poller task to process this *******
+ // information would be given. Then it should poll (until timeout) the
+ // requested destination for the response message and inject it from a
+ // asynchronous worker thread
+ try {
+ messageSender.getConnection().start(); // multiple calls are safely ignored
+ } catch (JMSException ignore) {}
- // if we are expecting a synchronous response back for the message sent out
- if (waitForResponse) {
- if (connection != null) {
- try {
- connection.start();
- } catch (JMSException ignore) {}
- } else {
- // If connection is null, we are using a cached session and the underlying
- // connection is already started. Thus, there is nothing to do here.
- }
- try {
- correlationId = message.getJMSMessageID();
- } catch(JMSException ignore) {}
- waitForResponseAndProcess(session, replyDestination,
- jmsOut.getReplyDestinationType(), msgCtx, correlationId);
- }
- }
+ try {
+ correlationId = message.getJMSMessageID();
+ } catch(JMSException ignore) {}
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException ignore) {}
- }
+ // We assume here that the response uses the same message property to
+ // specify the content type of the message.
+ waitForResponseAndProcess(messageSender.getSession(), replyDestination,
+ msgCtx, correlationId, contentTypeProperty);
+ // TODO ********************************************************************************
}
}
@@ -301,28 +233,18 @@
* @param session the session to use to listen for the response
* @param replyDestination the JMS reply Destination
* @param msgCtx the outgoing message for which we are expecting the response
+ * @param contentTypeProperty the message property used to determine the content type
+ * of the response message
* @throws AxisFault on error
*/
private void waitForResponseAndProcess(Session session, Destination replyDestination,
- String replyDestinationType, MessageContext msgCtx, String correlationId) throws AxisFault {
+ MessageContext msgCtx, String correlationId,
+ String contentTypeProperty) throws AxisFault {
try {
- MessageConsumer consumer = null;
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(replyDestinationType)) {
- if (correlationId != null) {
- consumer = ((QueueSession) session).createReceiver((Queue) replyDestination,
- "JMSCorrelationID = '" + correlationId + "'");
- } else {
- consumer = ((QueueSession) session).createReceiver((Queue) replyDestination);
- }
- } else {
- if (correlationId != null) {
- consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination,
- "JMSCorrelationID = '" + correlationId + "'", false);
- } else {
- consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination);
- }
- }
+ MessageConsumer consumer;
+ consumer = JMSUtils.createConsumer(session, replyDestination,
+ "JMSCorrelationID = '" + correlationId + "'");
// how long are we willing to wait for the sync response
long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
@@ -344,23 +266,13 @@
// update transport level metrics
metrics.incrementMessagesReceived();
try {
- if (reply instanceof BytesMessage) {
- metrics.incrementBytesReceived(JMSUtils.getBodyLength((BytesMessage) reply));
- } else if (reply instanceof MapMessage) {
- metrics.incrementBytesReceived((JMSUtils.getBodyLength((MapMessage) reply)));
- } else if (reply instanceof TextMessage) {
- metrics.incrementBytesReceived((
- (TextMessage) reply).getText().getBytes().length);
- } else {
- handleException("Unsupported JMS message type : " +
- reply.getClass().getName());
- }
+ metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply));
} catch (JMSException e) {
log.warn("Error reading JMS message size to update transport metrics", e);
}
try {
- processSyncResponse(msgCtx, reply);
+ processSyncResponse(msgCtx, reply, contentTypeProperty);
metrics.incrementMessagesReceived();
} catch (AxisFault e) {
metrics.incrementFaultsReceiving();
@@ -376,8 +288,9 @@
} catch (JMSException e) {
metrics.incrementFaultsReceiving();
- handleException("Error creating consumer or receiving reply to : " +
- replyDestination, e);
+ handleException("Error creating a consumer, or receiving a synchronous reply " +
+ "for outgoing MessageContext ID : " + msgCtx.getMessageID() +
+ " and reply Destination : " + replyDestination, e);
}
}
@@ -387,12 +300,14 @@
*
* @param msgContext the MessageContext
* @param session the JMS session
+ * @param contentTypeProperty the message property to be used to store the
+ * content type
* @return a JMS message from the context and session
* @throws JMSException on exception
* @throws AxisFault on exception
*/
- private Message createJMSMessage(MessageContext msgContext, Session session)
- throws JMSException, AxisFault {
+ private Message createJMSMessage(MessageContext msgContext, Session session,
+ String contentTypeProperty) throws JMSException, AxisFault {
Message message = null;
String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
@@ -416,20 +331,7 @@
String contentType = messageFormatter.getContentType(
msgContext, format, msgContext.getSoapAction());
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try {
- messageFormatter.writeTo(msgContext, format, baos, true);
- baos.flush();
- } catch (IOException e) {
- handleException("IO Error while creating BytesMessage", e);
- }
-
- if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
- contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
- message = session.createBytesMessage();
- BytesMessage bytesMsg = (BytesMessage) message;
- bytesMsg.writeBytes(baos.toByteArray());
- } else if (msgType != null && JMSConstants.JMS_MAP_MESSAGE.equals(msgType)) {
+ if (msgType != null && JMSConstants.JMS_MAP_MESSAGE.equals(msgType)) {
OMElement wrapper = msgContext.getEnvelope().getBody().getFirstElement();
if (wrapper != null && wrapper instanceof OMSourcedElement) {
OMSourcedElement omNode = (OMSourcedElement) wrapper;
@@ -499,15 +401,43 @@
}
}
} else {
- message = session.createTextMessage(); // default
- TextMessage txtMsg = (TextMessage) message;
+ boolean useBytesMessage =
+ msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
+ contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1;
+
+ OutputStream out;
+ StringWriter sw;
+ if (useBytesMessage) {
+ BytesMessage bytesMsg = session.createBytesMessage();
+ sw = null;
+ out = new BytesMessageOutputStream(bytesMsg);
+ message = bytesMsg;
+ } else {
+ sw = new StringWriter();
+ try {
+ out = new WriterOutputStream(sw, format.getCharSetEncoding());
+ } catch (UnsupportedCharsetException ex) {
+ handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
+ return null;
+ }
+ }
+
try {
- txtMsg.setText(new String(baos.toByteArray(), format.getCharSetEncoding()));
- } catch (UnsupportedEncodingException ex) {
- handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
+ messageFormatter.writeTo(msgContext, format, out, true);
+ out.close();
+ } catch (IOException e) {
+ handleException("IO Error while creating BytesMessage", e);
}
+
+ if (!useBytesMessage) {
+ TextMessage txtMsg = session.createTextMessage();
+ txtMsg.setText(sw.toString());
+ message = txtMsg;
+ }
+ }
+ if (contentTypeProperty != null) {
+ message.setStringProperty(contentTypeProperty, contentType);
}
- message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType);
} else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
message = session.createBytesMessage();
@@ -518,14 +448,12 @@
if (omNode != null && omNode instanceof OMText) {
Object dh = ((OMText) omNode).getDataHandler();
if (dh != null && dh instanceof DataHandler) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
- ((DataHandler) dh).writeTo(baos);
+ ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg));
} catch (IOException e) {
handleException("Error serializing binary content of element : " +
BaseConstants.DEFAULT_BINARY_WRAPPER, e);
}
- bytesMsg.writeBytes(baos.toByteArray());
}
}
@@ -534,6 +462,7 @@
TextMessage txtMsg = (TextMessage) message;
txtMsg.setText(msgContext.getEnvelope().getBody().
getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
+
} else if (JMSConstants.JMS_MAP_MESSAGE.equals(jmsPayloadType)) {
OMElement wrapper = msgContext.getEnvelope().getBody().
getFirstChildWithName(BaseConstants.DEFAULT_MAP_WRAPPER);
@@ -656,9 +585,12 @@
*
* @param outMsgCtx the outgoing message for which we are expecting the response
* @param message the JMS response message received
+ * @param contentTypeProperty the message property used to determine the content type
+ * of the response message
* @throws AxisFault on error
*/
- private void processSyncResponse(MessageContext outMsgCtx, Message message) throws AxisFault {
+ private void processSyncResponse(MessageContext outMsgCtx, Message message,
+ String contentTypeProperty) throws AxisFault {
MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
@@ -671,7 +603,9 @@
// workaround as Axis2 1.2 is about to be released and Synapse 1.0
responseMsgCtx.setServerSide(false);
- String contentType = JMSUtils.getProperty(message, BaseConstants.CONTENT_TYPE);
+ String contentType =
+ contentTypeProperty == null ? null
+ : JMSUtils.getProperty(message, contentTypeProperty);
try {
JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType);