You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/11/15 17:11:12 UTC
svn commit: r717873 [2/4] - in
/webservices/commons/trunk/scratch/asankha/transport/modules:
base/src/main/java/org/apache/axis2/transport/base/ jms/
jms/src/main/java/org/apache/axis2/transport/jms/ tests/
tests/src/test/java/org/apache/axis2/transpor...
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java?rev=717873&r1=717872&r2=717873&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java Sat Nov 15 08:11:11 2008
@@ -17,7 +17,6 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
-import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.axis2.transport.base.BaseUtils;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.MetricsCollector;
@@ -25,73 +24,76 @@
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.xml.namespace.QName;
+import javax.transaction.UserTransaction;
/**
- * This is the actual receiver which listens for and accepts JMS messages, and
- * hands them over to be processed by a worker thread. An instance of this
- * class is created for each JMSConnectionFactory, but all instances may and
- * will share the same worker thread pool held by the JMSListener
+ * This is the JMS message receiver which is invoked when a message is received. This processes
+ * the message through the engine
*/
-public class JMSMessageReceiver implements MessageListener {
+public class JMSMessageReceiver {
private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
/** The JMSListener */
private JMSListener jmsListener = null;
- /** The thread pool of workers */
- private WorkerPool workerPool = null;
- /** The Axis configuration context */
- private ConfigurationContext cfgCtx = null;
- /** A reference to the JMS Connection Factory to which this applies */
+ /** A reference to the JMS Connection Factory */
private JMSConnectionFactory jmsConnectionFactory = null;
- /** The endpoint this message receiver is bound to. */
- final JMSEndpoint endpoint;
- /** Metrics collector */
+ /** The JMS metrics collector */
private MetricsCollector metrics = null;
+ /** The endpoint this message receiver is bound to */
+ final JMSEndpoint endpoint;
/**
* Create a new JMSMessage receiver
*
* @param jmsListener the JMS transport Listener
- * @param jmsConFac the JMS connection factory we are associated with
- * @param workerPool the worker thread pool to be used
- * @param cfgCtx the axis ConfigurationContext
+ * @param jmsConFac the JMS connection factory we are associated with
+ * @param workerPool the worker thread pool to be used
+ * @param cfgCtx the axis ConfigurationContext
* @param serviceName the name of the Axis service
+ * @param endpoint the JMSEndpoint definition to be used
*/
- JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac,
- WorkerPool workerPool, ConfigurationContext cfgCtx, JMSEndpoint endpoint) {
+ JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
this.jmsListener = jmsListener;
this.jmsConnectionFactory = jmsConFac;
- this.workerPool = workerPool;
- this.cfgCtx = cfgCtx;
this.endpoint = endpoint;
this.metrics = jmsListener.getMetricsCollector();
}
/**
- * The entry point on the reception of each JMS message
+ * Process a new message received
*
* @param message the JMS message received
+ * @param ut UserTransaction which was used to receive the message
+ * @return true if caller should commit
*/
- public void onMessage(Message message) {
- // directly create a new worker and delegate processing
+ public boolean onMessage(Message message, UserTransaction ut) {
+
try {
if (log.isDebugEnabled()) {
StringBuffer sb = new StringBuffer();
- sb.append("Received JMS message to destination : " + message.getJMSDestination());
- sb.append("\nMessage ID : " + message.getJMSMessageID());
- sb.append("\nCorrelation ID : " + message.getJMSCorrelationID());
- sb.append("\nReplyTo ID : " + message.getJMSReplyTo());
+ sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
+ sb.append("\nDestination : ").append(message.getJMSDestination());
+ sb.append("\nMessage ID : ").append(message.getJMSMessageID());
+ sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
+ sb.append("\nReplyTo : ").append(message.getJMSReplyTo());
+ sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered());
+ sb.append("\nPriority : ").append(message.getJMSPriority());
+ sb.append("\nExpiration : ").append(message.getJMSExpiration());
+ sb.append("\nTimestamp : ").append(message.getJMSTimestamp());
+ sb.append("\nMessage Type : ").append(message.getJMSType());
+ sb.append("\nPersistent ? : ").append(
+ DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
+
log.debug(sb.toString());
if (log.isTraceEnabled() && message instanceof TextMessage) {
- log.trace("\nMessage : " + ((TextMessage) message).getText());
+ log.trace("\nMessage : " + ((TextMessage) message).getText());
}
}
} catch (JMSException e) {
@@ -109,112 +111,123 @@
// has this message already expired? expiration time == 0 means never expires
try {
- long expiryTime = message.getJMSExpiration();
+ long expiryTime = message.getJMSExpiration();
if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
if (log.isDebugEnabled()) {
log.debug("Discard expired message with ID : " + message.getJMSMessageID());
}
- return;
+ return true;
}
} catch (JMSException ignore) {}
- workerPool.execute(new Worker(message));
- }
- private void handleException(String msg, Exception e) {
- log.error(msg, e);
- throw new AxisJMSException(msg, e);
- }
+ boolean successful = false;
+ try {
+ successful = processThoughEngine(message, ut);
- private void handleException(String msg) {
- log.error(msg);
- throw new AxisJMSException(msg);
- }
+ } catch (JMSException e) {
+ log.error("JMS Exception encountered while processing", e);
+ } catch (AxisFault e) {
+ log.error("Axis fault processing message", e);
+ } catch (Exception e) {
+ log.error("Unknown error processing message", e);
+ } finally {
+ if (successful) {
+ metrics.incrementMessagesReceived();
+ } else {
+ metrics.incrementFaultsReceiving();
+ }
+ }
+
+ return successful;
+ }
/**
- * The actual Worker implementation which will process the
- * received JMS messages in the worker thread pool
+ * Process the new message through Axis2
+ *
+ * @param message the JMS message
+ * @param ut the UserTransaction used for receipt
+ * @return true if the caller should commit
+ * @throws JMSException, on JMS exceptions
+ * @throws AxisFault on Axis2 errors
*/
- class Worker implements Runnable {
+ private boolean processThoughEngine(Message message, UserTransaction ut)
+ throws JMSException, AxisFault {
- private Message message = null;
+ MessageContext msgContext = jmsListener.createMessageContext();
- Worker(Message message) {
- this.message = message;
- }
+ // set the JMS Message ID as the Message ID of the MessageContext
+ try {
+ msgContext.setMessageID(message.getJMSMessageID());
+ msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
+ } catch (JMSException ignore) {}
- public void run() {
+ String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
- MessageContext msgContext = jmsListener.createMessageContext();
+ AxisService service = endpoint.getService();
+ msgContext.setAxisService(service);
- // set the JMS Message ID as the Message ID of the MessageContext
- try {
- msgContext.setMessageID(message.getJMSMessageID());
- msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
- } catch (JMSException ignore) {}
+ // find the operation for the message, or default to one
+ Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
+ QName operationQName = (
+ operationParam != null ?
+ BaseUtils.getQNameFromString(operationParam.getValue()) :
+ BaseConstants.DEFAULT_OPERATION);
+
+ AxisOperation operation = service.getOperation(operationQName);
+ if (operation != null) {
+ msgContext.setAxisOperation(operation);
+ msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
+ }
- AxisService service = null;
- try {
- String soapAction = JMSUtils.
- getProperty(message, BaseConstants.SOAPACTION);
+ ContentTypeInfo contentTypeInfo =
+ endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
+ if (contentTypeInfo == null) {
+ throw new AxisFault("Unable to determine content type for message " +
+ msgContext.getMessageID());
+ }
- service = endpoint.getService();
- msgContext.setAxisService(service);
+ // set the message property OUT_TRANSPORT_INFO
+ // the reply is assumed to be over the JMSReplyTo destination, using
+ // the same incoming connection factory, if a JMSReplyTo is available
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo == null) {
+ // does the service specify a default reply destination ?
+ Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
+ if (param != null && param.getValue() != null) {
+ replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
+ }
- // find the operation for the message, or default to one
- Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
- QName operationQName = (
- operationParam != null ?
- BaseUtils.getQNameFromString(operationParam.getValue()) :
- BaseConstants.DEFAULT_OPERATION);
+ }
+ if (replyTo != null) {
+ msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+ new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
+ contentTypeInfo.getPropertyName()));
+ }
- AxisOperation operation = service.getOperation(operationQName);
- if (operation != null) {
- msgContext.setAxisOperation(operation);
- msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
- }
+ JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
+ if (ut != null) {
+ msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
+ }
- ContentTypeInfo contentTypeInfo =
- endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
- if (contentTypeInfo == null) {
- throw new AxisFault("Unable to determine content type for message " +
- msgContext.getMessageID());
- }
-
- // set the message property OUT_TRANSPORT_INFO
- // the reply is assumed to be over the JMSReplyTo destination, using
- // the same incoming connection factory, if a JMSReplyTo is available
- Destination replyTo = message.getJMSReplyTo();
- if (replyTo == null) {
- // does the service specify a default reply destination ?
- Parameter param = service.getParameter(JMSConstants.REPLY_PARAM);
- if (param != null && param.getValue() != null) {
- replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
- }
-
- }
- if (replyTo != null) {
- msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
- new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
- contentTypeInfo.getPropertyName()));
+ try {
+ jmsListener.handleIncomingMessage(
+ msgContext,
+ JMSUtils.getTransportHeaders(message),
+ soapAction,
+ contentTypeInfo.getContentType());
+
+ } finally {
+
+ Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
+ if (o != null) {
+ if ((o instanceof Boolean && ((Boolean) o)) ||
+ (o instanceof String && Boolean.valueOf((String) o))) {
+ return false;
}
-
- JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
-
- jmsListener.handleIncomingMessage(
- msgContext,
- JMSUtils.getTransportHeaders(message),
- soapAction,
- contentTypeInfo.getContentType()
- );
- metrics.incrementMessagesReceived();
-
- } catch (Throwable e) {
- metrics.incrementFaultsReceiving();
- jmsListener.error(service, e);
- log.error("Exception while processing incoming message", e);
}
+ return true;
}
}
}
Added: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java?rev=717873&view=auto
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java (added)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java Sat Nov 15 08:11:11 2008
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.transport.base.BaseConstants;
+
+import javax.jms.*;
+import javax.transaction.*;
+
+/**
+ * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction
+ * (if requested) or the local session transaction, if used. An instance of this class is unique
+ * to a single message send out operation and will not be shared.
+ */
+public class JMSMessageSender {
+
+ private static final Log log = LogFactory.getLog(JMSMessageSender.class);
+
+ /** The Connection to be used to send out */
+ private Connection connection = null;
+ /** The Session to be used to send out */
+ private Session session = null;
+ /** The MessageProducer used */
+ private MessageProducer producer = null;
+ /** Target Destination */
+ private Destination destination = null;
+ /** The level of cachability for resources */
+ private int cacheLevel = JMSConstants.CACHE_CONNECTION;
+ /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */
+ private boolean jmsSpec11 = true;
+ /** Are we sending to a Queue ? */
+ private Boolean isQueue = null;
+
+ /**
+ * This is a low-end method to support the one-time sends using JMS 1.0.2b
+ * @param connection the JMS Connection
+ * @param session JMS Session
+ * @param producer the MessageProducer
+ * @param destination the JMS Destination
+ * @param cacheLevel cacheLevel - None | Connection | Session | Producer
+ * @param jmsSpec11 true if the JMS 1.1 API should be used
+ * @param isQueue posting to a Queue?
+ */
+ public JMSMessageSender(Connection connection, Session session, MessageProducer producer,
+ Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) {
+
+ this.connection = connection;
+ this.session = session;
+ this.producer = producer;
+ this.destination = destination;
+ this.cacheLevel = cacheLevel;
+ this.jmsSpec11 = jmsSpec11;
+ this.isQueue = isQueue;
+ }
+
+ /**
+ * Create a JMSSender using a JMSConnectionFactory and target EPR
+ *
+ * @param jmsConnectionFactory the JMSConnectionFactory
+ * @param targetAddress target EPR
+ */
+ public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) {
+
+ if (jmsConnectionFactory != null) {
+ this.cacheLevel = jmsConnectionFactory.getCacheLevel();
+ this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11();
+ this.connection = jmsConnectionFactory.getConnection();
+ this.session = jmsConnectionFactory.getSession(connection);
+ this.destination =
+ jmsConnectionFactory.getSharedDestination() == null ?
+ jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) :
+ jmsConnectionFactory.getSharedDestination();
+ this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination);
+
+ } else {
+ JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress);
+ jmsOut.loadConnectionFactoryFromProperies();
+ }
+ }
+
+ /**
+ * Perform actual send of JMS message to the Destination selected
+ *
+ * @param message the JMS message
+ * @param msgCtx the Axis2 MessageContext
+ */
+ public void send(Message message, MessageContext msgCtx) {
+
+ Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND);
+ Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY);
+ Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE);
+ Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY);
+ Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE);
+
+ // Do not commit, if message is marked for rollback
+ if (rollbackOnly != null && rollbackOnly) {
+ jtaCommit = Boolean.FALSE;
+ }
+
+ if (persistent != null) {
+ try {
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (JMSException e) {
+ handleException("Error setting JMS Producer for PERSISTENT delivery", e);
+ }
+ }
+ if (priority != null) {
+ try {
+ producer.setPriority(priority);
+ } catch (JMSException e) {
+ handleException("Error setting JMS Producer priority to : " + priority, e);
+ }
+ }
+ if (timeToLive != null) {
+ try {
+ producer.setTimeToLive(timeToLive);
+ } catch (JMSException e) {
+ handleException("Error setting JMS Producer TTL to : " + timeToLive, e);
+ }
+ }
+
+ boolean sendingSuccessful = false;
+ // perform actual message sending
+ try {
+ if (jmsSpec11 || isQueue == null) {
+ producer.send(message);
+
+ } else {
+ if (isQueue) {
+ ((QueueSender) producer).send(message);
+
+ } else {
+ ((TopicPublisher) producer).publish(message);
+ }
+ }
+
+ // set the actual MessageID to the message context for use by any others down the line
+ String msgId = null;
+ try {
+ msgId = message.getJMSMessageID();
+ if (msgId != null) {
+ msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
+ }
+ } catch (JMSException ignore) {}
+
+ sendingSuccessful = true;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sent Message Context ID : " + msgCtx.getMessageID() +
+ " with JMS Message ID : " + msgId +
+ " to destination : " + producer.getDestination());
+ }
+
+ } catch (JMSException e) {
+ log.error("Error sending message with MessageContext ID : " +
+ msgCtx.getMessageID() + " to destination : " + destination, e);
+
+ } finally {
+
+ if (jtaCommit != null) {
+
+ UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION);
+ if (ut != null) {
+
+ try {
+ if (sendingSuccessful && jtaCommit) {
+ ut.commit();
+ } else {
+ ut.rollback();
+ }
+ msgCtx.removeProperty(BaseConstants.USER_TRANSACTION);
+
+ if (log.isDebugEnabled()) {
+ log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
+ " JTA Transaction");
+ }
+
+ } catch (Exception e) {
+ handleException("Error committing/rolling back JTA transaction after " +
+ "sending of message with MessageContext ID : " + msgCtx.getMessageID() +
+ " to destination : " + destination, e);
+ }
+ }
+
+ } else {
+ try {
+ if (session.getTransacted()) {
+ if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) {
+ session.commit();
+ } else {
+ session.rollback();
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
+ " local (JMS Session) Transaction");
+ }
+
+ } catch (JMSException e) {
+ handleException("Error committing/rolling back local (i.e. session) " +
+ "transaction after sending of message with MessageContext ID : " +
+ msgCtx.getMessageID() + " to destination : " + destination, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Close non-shared producer, session and connection if any
+ */
+ public void close() {
+ if (cacheLevel < JMSConstants.CACHE_PRODUCER) {
+ try {
+ producer.close();
+ } catch (JMSException e) {
+ log.error("Error closing JMS MessageProducer after send", e);
+ }
+ }
+
+ if (cacheLevel < JMSConstants.CACHE_SESSION) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ log.error("Error closing JMS Session after send", e);
+ }
+ }
+
+ if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ log.error("Error closing JMS Connection after send", e);
+ }
+ }
+ }
+
+ private void handleException(String message, Exception e) {
+ log.error(message, e);
+ throw new AxisJMSException(message, e);
+ }
+
+ private Boolean getBooleanProperty(MessageContext msgCtx, String name) {
+ Object o = msgCtx.getProperty(name);
+ if (o != null) {
+ if (o instanceof Boolean) {
+ return (Boolean) o;
+ } else if (o instanceof String) {
+ return Boolean.valueOf((String) o);
+ }
+ }
+ return null;
+ }
+
+ private Integer getIntegerProperty(MessageContext msgCtx, String name) {
+ Object o = msgCtx.getProperty(name);
+ if (o != null) {
+ if (o instanceof Integer) {
+ return (Integer) o;
+ } else if (o instanceof String) {
+ return Integer.parseInt((String) o);
+ }
+ }
+ return null;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ public void setSession(Session session) {
+ this.session = session;
+ }
+
+ public void setProducer(MessageProducer producer) {
+ this.producer = producer;
+ }
+
+ public void setCacheLevel(int cacheLevel) {
+ this.cacheLevel = cacheLevel;
+ }
+
+ public int getCacheLevel() {
+ return cacheLevel;
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public MessageProducer getProducer() {
+ return producer;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+}
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=717873&r1=717872&r2=717873&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Sat Nov 15 08:11:11 2008
@@ -41,8 +41,8 @@
/** The naming context */
private Context context;
/**
- * this is a reference to the underlying JMS connection factory when sending messages
- * through connection factories not defined to the transport sender
+ * this is a reference to the underlying JMS ConnectionFactory when sending messages
+ * through connection factories not defined at the TransportSender level
*/
private ConnectionFactory connectionFactory = null;
/**
@@ -53,14 +53,18 @@
private JMSConnectionFactory jmsConnectionFactory = null;
/** the Destination queue or topic for the outgoing message */
private Destination destination = null;
- /** the Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
- private String destinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+ /** the Destination queue or topic for the outgoing message
+ * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+ */
+ private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
/** the Reply Destination queue or topic for the outgoing message */
private Destination replyDestination = null;
/** the Reply Destination name */
private String replyDestinationName = null;
- /** the Reply Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
- private String replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+ /** the Reply Destination queue or topic for the outgoing message
+ * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+ */
+ private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
/** the EPR properties when the out-transport info is generated from a target EPR */
private Hashtable<String,String> properties = null;
/** the target EPR string where applicable */
@@ -68,13 +72,12 @@
/** the message property name that stores the content type of the outgoing message */
private String contentTypeProperty;
- private String contentType = null;
-
/**
* Creates an instance using the given JMS connection factory and destination
*
* @param jmsConnectionFactory the JMS connection factory
* @param dest the destination
+ * @param contentTypeProperty
*/
JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest,
String contentTypeProperty) {
@@ -91,26 +94,31 @@
* @param targetEPR the target EPR
*/
JMSOutTransportInfo(String targetEPR) {
+
this.targetEPR = targetEPR;
if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) {
handleException("Invalid prefix for a JMS EPR : " + targetEPR);
+
} else {
properties = BaseUtils.getEPRProperties(targetEPR);
- String destinationType = properties.get(JMSConstants.DEST_PARAM_TYPE);
- if(destinationType != null) {
+ String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE);
+ if (destinationType != null) {
setDestinationType(destinationType);
}
- String replyDestinationType = properties.get(JMSConstants.REPLY_PARAM_TYPE);
- if(replyDestinationType != null) {
+
+ String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE);
+ if (replyDestinationType != null) {
setReplyDestinationType(replyDestinationType);
}
- replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
+
+ replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
try {
context = new InitialContext(properties);
} catch (NamingException e) {
handleException("Could not get an initial context using " + properties, e);
}
+
destination = getDestination(context, targetEPR);
replyDestination = getReplyDestination(context, targetEPR);
}
@@ -136,7 +144,7 @@
private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) {
try {
- String conFacJndiName = props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM);
+ String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME);
if (conFacJndiName != null) {
return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName);
} else {
@@ -177,10 +185,11 @@
* @return the JMS destination, or null if it does not exist
*/
private Destination getReplyDestination(Context context, String url) {
- String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
+ String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
if(replyDestinationName == null) {
return null;
}
+
try {
return JMSUtils.lookup(context, Destination.class, replyDestinationName);
} catch (NameNotFoundException e) {
@@ -190,13 +199,14 @@
} catch (NamingException e) {
handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e);
}
+
return null;
}
/**
* Look up for the given destination
- * @param replyDest
- * @return
+ * @param replyDest the JNDI name to lookup Destination required
+ * @return Destination for the JNDI name passed
*/
public Destination getReplyDestination(String replyDest) {
try {
@@ -236,7 +246,7 @@
}
public void setContentType(String contentType) {
- this.contentType = contentType;
+ // this is a useless Axis2 method imposed by the OutTransportInfo interface :(
}
public Hashtable<String,String> getProperties() {
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=717873&r1=717872&r2=717873&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Sat Nov 15 08:11:11 2008
@@ -20,6 +20,7 @@
import org.apache.axiom.om.OMText;
import org.apache.axiom.om.OMNode;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.TransportOutDescription;
@@ -29,11 +30,9 @@
import org.apache.axis2.transport.base.*;
import org.apache.axis2.transport.base.streams.WriterOutputStream;
import org.apache.axis2.transport.http.HTTPConstants;
-import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.activation.DataHandler;
-import javax.naming.Context;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
@@ -45,34 +44,23 @@
*/
public class JMSSender extends AbstractTransportSender implements ManagementSupport {
- public static final String TRANSPORT_NAME = "jms";
-
- private JMSConnectionFactoryManager connFacManager;
+ public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
- public JMSSender() {
- log = LogFactory.getLog(JMSSender.class);
- }
+ /** The JMS connection factory manager to be used when sending messages out */
+ private JMSConnectionFactoryManager connFacManager;
/**
* Initialize the transport sender by reading pre-defined connection factories for
- * outgoing messages. These will create sessions (one per each destination dealth with)
- * to be used when messages are being sent.
+ * outgoing messages.
+ *
* @param cfgCtx the configuration context
* @param transportOut the transport sender definition from axis2.xml
* @throws AxisFault on error
*/
public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
super.init(cfgCtx, transportOut);
- connFacManager = new JMSConnectionFactoryManager(cfgCtx);
- // read the connection factory definitions and create them
- connFacManager.loadConnectionFactoryDefinitions(transportOut);
- connFacManager.start();
- }
-
- @Override
- public void stop() {
- connFacManager.stop();
- super.stop();
+ connFacManager = new JMSConnectionFactoryManager(transportOut);
+ log.info("JMS Transport Sender initialized...");
}
/**
@@ -85,7 +73,7 @@
private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
Map<String,String> props = trpInfo.getProperties();
if(trpInfo.getProperties() != null) {
- String jmsConnectionFactoryName = props.get(JMSConstants.CONFAC_PARAM);
+ String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
if(jmsConnectionFactoryName != null) {
return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
} else {
@@ -103,58 +91,87 @@
OutTransportInfo outTransportInfo) throws AxisFault {
JMSConnectionFactory jmsConnectionFactory = null;
- Connection connection = null; // holds a one time connection if used
- JMSOutTransportInfo jmsOut;
- Session session = null;
- Destination replyDestination = null;
+ JMSOutTransportInfo jmsOut = null;
+ JMSMessageSender messageSender = null;
- try {
- if (targetAddress != null) {
+ if (targetAddress != null) {
- jmsOut = new JMSOutTransportInfo(targetAddress);
- // do we have a definition for a connection factory to use for this address?
- jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+ jmsOut = new JMSOutTransportInfo(targetAddress);
+ // do we have a definition for a connection factory to use for this address?
+ jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+
+ if (jmsConnectionFactory != null) {
+ messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
- if (jmsConnectionFactory != null) {
- // create new or get existing session to send to the destination from the CF
- session = jmsConnectionFactory.getSessionForDestination(
- JMSUtils.getDestination(targetAddress));
+ } else {
+ try {
+ messageSender = JMSUtils.createJMSSender(jmsOut);
+ } catch (JMSException e) {
+ handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+ }
+ }
- } else {
- // digest the targetAddress and locate CF from the EPR
- jmsOut.loadConnectionFactoryFromProperies();
- try {
- // create a one time connection and session to be used
- Hashtable<String,String> jndiProps = jmsOut.getProperties();
- connection = JMSUtils.createConnection(jmsOut.getConnectionFactory(),
- jndiProps.get(Context.SECURITY_PRINCIPAL),
- jndiProps.get(Context.SECURITY_CREDENTIALS),
- jmsOut.getDestinationType());
+ } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
+
+ jmsOut = (JMSOutTransportInfo) outTransportInfo;
+ try {
+ messageSender = JMSUtils.createJMSSender(jmsOut);
+ } catch (JMSException e) {
+ handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+ }
+ }
- session = JMSUtils.createSession(connection, false,
- Session.AUTO_ACKNOWLEDGE, jmsOut.getDestinationType());
+ // The message property to be used to send the content type is determined by
+ // the out transport info, i.e. either from the EPR if we are sending a request,
+ // or, if we are sending a response, from the configuration of the service that
+ // received the request). The property name can be overridden by a message
+ // context property.
+ String contentTypeProperty =
+ (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+ if (contentTypeProperty == null) {
+ contentTypeProperty = jmsOut.getContentTypeProperty();
+ }
- } catch (JMSException e) {
- handleException("Error creating a connection/session for : " + targetAddress, e);
- }
- }
- replyDestination = jmsOut.getReplyDestination();
+ if (messageSender.getCacheLevel() < JMSConstants.CACHE_SESSION) {
+ // only connection has been cached at most
+ sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
+
+ } else {
+ // need to synchronize as Sessions are not thread safe
+ synchronized (messageSender.getSession()) {
+ sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
+ }
+ }
+ }
- } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
+ /**
+ * Perform actual sending of the JMS message
+ */
+ private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
+ String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
+ JMSOutTransportInfo jmsOut) throws AxisFault {
+
+ // convert the axis message context into a JMS Message that we can send over JMS
+ Message message = null;
+ String correlationId = null;
+ try {
+ message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
+ } catch (JMSException e) {
+ handleException("Error creating a JMS message from the message context", e);
+ }
- jmsOut = (JMSOutTransportInfo) outTransportInfo;
- jmsConnectionFactory = jmsOut.getJmsConnectionFactory();
+ // should we wait for a synchronous response on this same thread?
+ boolean waitForResponse = waitForSynchronousResponse(msgCtx);
+ Destination replyDestination = jmsOut.getReplyDestination();
- session = jmsConnectionFactory.getSessionForDestination(
- jmsOut.getDestination().toString());
- } else {
- handleException("Unable to get JMSOutTransportInfo");
- return; // We never get here. Just make the compiler happy.
- }
-
- Destination destination = jmsOut.getDestination();
+ // if this is a synchronous out-in, prepare to listen on the response destination
+ if (waitForResponse) {
String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
+ if (replyDestName == null && jmsConnectionFactory != null) {
+ replyDestName = jmsConnectionFactory.getReplyToDestination();
+ }
+
if (replyDestName != null) {
if (jmsConnectionFactory != null) {
replyDestination = jmsConnectionFactory.getDestination(replyDestName);
@@ -162,107 +179,45 @@
replyDestination = jmsOut.getReplyDestination(replyDestName);
}
}
+ replyDestination = JMSUtils.setReplyDestination(
+ replyDestination, messageSender.getSession(), message);
+ }
- if(session == null) {
- handleException("Could not create JMS session");
- }
-
- // now we are going to use the JMS session, but if this was a session from a
- // defined JMS connection factory, we need to synchronize as sessions are not
- // thread safe
- synchronized(session) {
- // The message property to be used to send the content type is determined by
- // the out transport info, i.e. either from the EPR if we are sending a request,
- // or, if we are sending a response, from the configuration of the service that
- // received the request). The property name can be overridden by a message
- // context property.
- String contentTypeProperty =
- (String)msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
- if (contentTypeProperty == null) {
- contentTypeProperty = jmsOut.getContentTypeProperty();
- }
-
- // convert the axis message context into a JMS Message that we can send over JMS
- Message message = null;
- String correlationId = null;
- try {
- message = createJMSMessage(msgCtx, session, contentTypeProperty);
- } catch (JMSException e) {
- handleException("Error creating a JMS message from the axis message context", e);
- }
-
- String destinationType = jmsOut.getDestinationType();
-
- // if the destination does not exist, see if we can create it
- destination = JMSUtils.createDestinationIfRequired(
- destination, destinationType, targetAddress, session);
-
- if(jmsOut.getReplyDestinationName() != null) {
- replyDestination = JMSUtils.createReplyDestinationIfRequired(
- replyDestination, jmsOut.getReplyDestinationName(),
- jmsOut.getReplyDestinationType(), targetAddress, session);
- }
-
- // should we wait for a synchronous response on this same thread?
- boolean waitForResponse = waitForSynchronousResponse(msgCtx);
-
- // if this is a synchronous out-in, prepare to listen on the response destination
- if (waitForResponse) {
- replyDestination = JMSUtils.setReplyDestination(
- replyDestination, session, message);
- }
+ try {
+ messageSender.send(message, msgCtx);
+ metrics.incrementMessagesSent();
- // send the outgoing message over JMS to the destination selected
- try {
- JMSUtils.sendMessageToJMSDestination(session, destination, destinationType, message);
+ } catch (AxisJMSException e) {
+ metrics.incrementFaultsSending();
+ handleException("Error sending JMS message", e);
+ }
- // set the actual MessageID to the message context for use by any others
- try {
- String msgId = message.getJMSMessageID();
- if (msgId != null) {
- msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
- }
- } catch (JMSException ignore) {}
+ try {
+ metrics.incrementBytesSent(JMSUtils.getMessageSize(message));
+ } catch (JMSException e) {
+ log.warn("Error reading JMS message size to update transport metrics", e);
+ }
- metrics.incrementMessagesSent();
- try {
- metrics.incrementBytesSent(JMSUtils.getMessageSize(message));
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
- } catch (BaseTransportException e) {
- metrics.incrementFaultsSending();
- throw e;
- }
+ // if we are expecting a synchronous response back for the message sent out
+ if (waitForResponse) {
+ // TODO ********************************************************************************
+ // TODO **** replace with asynchronous polling via a poller task to process this *******
+ // information would be given. Then it should poll (until timeout) the
+ // requested destination for the response message and inject it from a
+ // asynchronous worker thread
+ try {
+ messageSender.getConnection().start(); // multiple calls are safely ignored
+ } catch (JMSException ignore) {}
- // if we are expecting a synchronous response back for the message sent out
- if (waitForResponse) {
- if (connection != null) {
- try {
- connection.start();
- } catch (JMSException ignore) {}
- } else {
- // If connection is null, we are using a cached session and the underlying
- // connection is already started. Thus, there is nothing to do here.
- }
- try {
- correlationId = message.getJMSMessageID();
- } catch(JMSException ignore) {}
-
- // We assume here that the response uses the same message property to
- // specify the content type of the message.
- waitForResponseAndProcess(session, replyDestination,
- jmsOut.getReplyDestinationType(), msgCtx, correlationId,
- contentTypeProperty);
- }
- }
+ try {
+ correlationId = message.getJMSMessageID();
+ } catch(JMSException ignore) {}
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException ignore) {}
- }
+ // We assume here that the response uses the same message property to
+ // specify the content type of the message.
+ waitForResponseAndProcess(messageSender.getSession(), replyDestination,
+ msgCtx, correlationId, contentTypeProperty);
+ // TODO ********************************************************************************
}
}
@@ -278,17 +233,13 @@
* @throws AxisFault on error
*/
private void waitForResponseAndProcess(Session session, Destination replyDestination,
- String replyDestinationType, MessageContext msgCtx, String correlationId,
+ MessageContext msgCtx, String correlationId,
String contentTypeProperty) throws AxisFault {
try {
MessageConsumer consumer;
- if (correlationId != null) {
- consumer = JMSUtils.createConsumer(session, replyDestination,
- "JMSCorrelationID = '" + correlationId + "'");
- } else {
- consumer = JMSUtils.createConsumer(session, replyDestination);
- }
+ consumer = JMSUtils.createConsumer(session, replyDestination,
+ "JMSCorrelationID = '" + correlationId + "'");
// how long are we willing to wait for the sync response
long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
@@ -332,8 +283,9 @@
} catch (JMSException e) {
metrics.incrementFaultsReceiving();
- handleException("Error creating consumer or receiving reply to : " +
- replyDestination, e);
+ handleException("Error creating a consumer, or receiving a synchronous reply " +
+ "for outgoing MessageContext ID : " + msgCtx.getMessageID() +
+ " and reply Destination : " + replyDestination, e);
}
}