You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/12/08 19:15:41 UTC
svn commit: r724432 [3/4] - in
/webservices/commons/trunk/modules/transport/modules:
base/src/main/java/org/apache/axis2/transport/base/ jms/
jms/src/main/java/org/apache/axis2/transport/jms/
testkit/src/main/java/org/apache/axis2/transport/testkit/tes...
Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Mon Dec 8 10:15:40 2008
@@ -25,13 +25,14 @@
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterIncludeImpl;
import org.apache.axis2.format.TextMessageBuilder;
import org.apache.axis2.format.TextMessageBuilderAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.base.BaseUtils;
+import org.apache.axis2.transport.base.BaseConstants;
+import org.apache.axis2.transport.base.threads.WorkerPool;
import javax.jms.*;
import javax.jms.Queue;
@@ -55,58 +56,6 @@
private static final Object[] NOPARMS = new Object[] {};
/**
- * Create a JMS Queue using the given connection with the JNDI destination name, and return the
- * JMS Destination name of the created queue
- *
- * @param con the JMS Connection to be used
- * @param destinationJNDIName the JNDI name of the Queue to be created
- * @return the JMS Destination name of the created Queue
- * @throws JMSException on error
- */
- public static String createJMSQueue(Connection con, String destinationJNDIName)
- throws JMSException {
-
- try {
- QueueSession session
- = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(destinationJNDIName);
- log.info("JMS Queue with JNDI name : " + destinationJNDIName + " created");
- return queue.getQueueName();
-
- } finally {
- try {
- con.close();
- } catch (JMSException ignore) {}
- }
- }
-
- /**
- * Create a JMS Topic using the given connection with the JNDI destination name, and return the
- * JMS Destination name of the created queue
- *
- * @param con the JMS Connection to be used
- * @param destinationJNDIName the JNDI name of the Topic to be created
- * @return the JMS Destination name of the created Topic
- * @throws JMSException on error
- */
- public static String createJMSTopic(Connection con, String destinationJNDIName)
- throws JMSException {
-
- try {
- TopicSession session
- = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(destinationJNDIName);
- log.info("JMS Topic with JNDI name : " + destinationJNDIName + " created");
- return topic.getTopicName();
-
- } finally {
- try {
- con.close();
- } catch (JMSException ignore) {}
- }
- }
-
- /**
* Should this service be enabled over the JMS transport?
*
* @param service the Axis service
@@ -131,26 +80,43 @@
* Get the EPR for the given JMS connection factory and destination
* the form of the URL is
* jms:/<destination>?[<key>=<value>&]*
+ * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS
+ * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered
*
- * @param cf the Axis2 JMS connection factory
- * @param destination the JNDI name of the destination
+ * @param cf the Axis2 JMS connection factory
+ * @param destinationType the type of destination
+ * @param endpoint JMSEndpoint
* @return the EPR as a String
*/
- // TODO: duplicate code (see JMSConnectionFactory#getEPRForDestination)
- static String getEPR(JMSConnectionFactory cf, JMSEndpoint endpoint) {
+ static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) {
StringBuffer sb = new StringBuffer();
- sb.append(JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName());
- sb.append("?").append(JMSConstants.DEST_PARAM_TYPE);
- sb.append("=").append(endpoint.getDestinationType());
- for (Map.Entry<String,String> entry : cf.getJndiProperties().entrySet()) {
- sb.append("&").append(entry.getKey()).append("=").append(entry.getValue());
- }
- String contentTypeProperty = endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty();
- if (contentTypeProperty != null) {
- sb.append("&");
- sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
- sb.append("=");
- sb.append(contentTypeProperty);
+
+ sb.append(
+ JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName());
+ sb.append("?").
+ append(JMSConstants.PARAM_DEST_TYPE).append("=").append(
+ destinationType == JMSConstants.TOPIC ?
+ JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE);
+
+ if (endpoint.getContentTypeRuleSet() != null) {
+ String contentTypeProperty =
+ endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty();
+ if (contentTypeProperty != null) {
+ sb.append("&");
+ sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+ sb.append("=");
+ sb.append(contentTypeProperty);
+ }
+ }
+
+ for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) {
+ if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) &&
+ !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) &&
+ !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) &&
+ !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) {
+ sb.append("&").append(
+ entry.getKey()).append("=").append(entry.getValue());
+ }
}
return sb.toString();
}
@@ -188,59 +154,16 @@
}
/**
- * Set JNDI properties and any other connection factory parameters to the connection factory
- * passed in, looking at the parameter in axis2.xml
- * @param param the axis parameter that holds the connection factory settings
- * @param jmsConFactory the JMS connection factory to which the parameters should be applied
+ * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in
+ * @param message the JMS message read
+ * @param msgContext the Axis2 MessageContext to be populated
+ * @param contentType content type for the message
+ * @throws AxisFault
+ * @throws JMSException
*/
- public static void setConnectionFactoryParameters(
- Parameter param, JMSConnectionFactory jmsConFactory) {
-
- ParameterIncludeImpl pi = new ParameterIncludeImpl();
- try {
- pi.deserializeParameters((OMElement) param.getValue());
- } catch (AxisFault axisFault) {
- log.error("Error reading parameters for JMS connection factory" +
- jmsConFactory.getName(), axisFault);
- }
-
- for (Object o : pi.getParameters()) {
-
- Parameter p = (Parameter) o;
-
- if (JMSConstants.CONFAC_TYPE.equals(p.getName())) {
- String connectionFactoryType = (String) p.getValue();
- jmsConFactory.setConnectionFactoryType(connectionFactoryType);
-
- } else if (JMSConstants.RECONNECT_TIMEOUT.equals(p.getName())) {
- String strTimeout = (String) p.getValue();
- int reconnectTimeoutSeconds = Integer.parseInt(strTimeout);
- long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000;
- jmsConFactory.setReconnectTimeout(reconnectTimeoutMillis);
-
- } else if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) {
- jmsConFactory.addJNDIContextProperty(
- Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue());
- } else if (Context.PROVIDER_URL.equals(p.getName())) {
- jmsConFactory.addJNDIContextProperty(
- Context.PROVIDER_URL, (String) p.getValue());
- } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) {
- jmsConFactory.addJNDIContextProperty(
- Context.SECURITY_PRINCIPAL, (String) p.getValue());
- } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) {
- jmsConFactory.addJNDIContextProperty(
- Context.SECURITY_CREDENTIALS, (String) p.getValue());
- } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) {
- jmsConFactory.setConnFactoryJNDIName((String) p.getValue());
- jmsConFactory.addJNDIContextProperty(
- JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue());
- } else {
- jmsConFactory.addJNDIContextProperty( p.getName(), (String) p.getValue());
- }
- }
- }
+ public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType)
+ throws AxisFault, JMSException {
- public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) throws AxisFault, JMSException {
if (message instanceof BytesMessage) {
if (contentType == null) {
log.debug("No content type specified; assuming application/octet-stream.");
@@ -268,6 +191,7 @@
return; // Make compiler happy
}
msgContext.setEnvelope(envelope);
+
} else if (message instanceof TextMessage) {
String type;
if (contentType == null) {
@@ -288,6 +212,7 @@
}
builder = new SOAPBuilder();
}
+
TextMessageBuilder textMessageBuilder;
if (builder instanceof TextMessageBuilder) {
textMessageBuilder = (TextMessageBuilder)builder;
@@ -312,6 +237,7 @@
*/
public static Destination setReplyDestination(Destination replyDestination, Session session,
Message message) {
+
if (replyDestination == null) {
try {
// create temporary queue to receive the reply
@@ -340,116 +266,6 @@
}
/**
- * When trying to send a message to a destination, if it does not exist, try to create it
- *
- * @param destination the JMS destination to send messages
- * @param destinationType type of the destination (can be a queue or a topic)
- * @param targetAddress the target JMS EPR to find the Destination to be created if required
- * @param session the JMS session to use
- * @return the JMS Destination where messages could be posted
- * @throws AxisFault if the target Destination does not exist and cannot be created
- */
- public static Destination createDestinationIfRequired(Destination destination,
- String destinationType, String targetAddress, Session session) throws AxisFault {
-
- if (destination == null) {
- if (targetAddress != null) {
- String name = JMSUtils.getDestination(targetAddress);
- if (log.isDebugEnabled()) {
- log.debug("Creating JMS Destination : " + name);
- }
-
- try {
- destination = createDestination(session, name, destinationType);
- } catch (JMSException e) {
- handleException("Error creating destination Queue : " + name, e);
- }
- } else {
- handleException("Cannot send reply to null JMS Destination");
- }
- }
- return destination;
- }
-
- /**
- * If reply destination does not exist, try to create it
- *
- * @param destination the destination queue or topic
- * @param replyDestinationName name of the reply destination queue or topic
- * @param destinationType type of the destination (can be queue or topic)
- * @param targetAddress target address of the queue or topic
- * @param session JMS session with the message to be sent
- * @return destination created if the destination is null or the destination otherwise
- * @throws org.apache.axis2.AxisFault in case of an error in creating the destination
- */
- public static Destination createReplyDestinationIfRequired(Destination destination,
- String replyDestinationName, String destinationType, String targetAddress, Session session)
- throws AxisFault {
-
- if (destination == null) {
- if (targetAddress != null) {
- if (log.isDebugEnabled()) {
- log.debug("Creating JMS Reply Destination : " + replyDestinationName);
- }
-
- try {
- destination = createDestination(session, replyDestinationName, destinationType);
- } catch (JMSException e) {
- handleException("Error creating reply destination : "
- + replyDestinationName, e);
- }
- } else {
- handleException("Cannot send reply to null reply JMS Destination");
- }
- }
- return destination;
- }
-
- /**
- * Send the given message to the Destination using the given session
- *
- * @param session the session to use to send
- * @param destination the Destination
- * @param destinationType type of the destination (can be a queue or a topic)
- * @param message the JMS Message
- * @throws AxisFault on error
- */
- public static void sendMessageToJMSDestination(Session session,
- Destination destination, String destinationType, Message message) throws AxisFault {
-
- MessageProducer producer = null;
- try {
- if (log.isDebugEnabled()) {
- log.debug("Sending message to destination : " + destination);
- }
-
- if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType)) {
- producer = ((TopicSession) session).createPublisher((Topic) destination);
- ((TopicPublisher) producer).publish(message);
- } else {
- producer = ((QueueSession) session).createSender((Queue) destination);
- producer.send(message);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Sent message to destination : " + destination +
- "\nMessage ID : " + message.getJMSMessageID() +
- "\nCorrelation ID : " + message.getJMSCorrelationID() +
- "\nReplyTo ID : " + message.getJMSReplyTo());
- }
-
- } catch (JMSException e) {
- handleException("Error creating a producer or sending to : " + destination, e);
- } finally {
- if (producer != null) {
- try {
- producer.close();
- } catch (JMSException ignore) {}
- }
- }
- }
-
- /**
* Set transport headers from the axis message context, into the JMS message
*
* @param msgContext the axis message context
@@ -485,19 +301,21 @@
} else {
log.warn("Invalid delivery mode ignored : " + o);
}
+
} else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
message.setJMSExpiration(
- Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
+ Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
} else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID));
} else if (JMSConstants.JMS_PRIORITY.equals(name)) {
message.setJMSPriority(
- Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
+ Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
} else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
message.setJMSTimestamp(
- Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
+ Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
} else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE));
+
} else {
Object value = headerMap.get(name);
if (value instanceof String) {
@@ -644,80 +462,14 @@
}
- // ----------- JMS 1.0.2b compatibility methods -------------
- public static Connection createConnection(ConnectionFactory conFactory, String user,
- String pass, String destinationType) throws JMSException {
-
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) {
- if (user != null && pass != null) {
- return ((QueueConnectionFactory) conFactory).createQueueConnection(user, pass);
- } else {
- return ((QueueConnectionFactory) conFactory).createQueueConnection();
- }
-
- } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
- if (user != null && pass != null) {
- return ((TopicConnectionFactory) conFactory).createTopicConnection(user, pass);
- } else {
- return ((TopicConnectionFactory) conFactory).createTopicConnection();
- }
- } else {
- handleException("Unable to determine type of JMS Connection Factory - i.e Queue/Topic");
- }
- return null;
- }
-
- public static Session createSession(Connection con,
- boolean transacted, int acknowledgeMode, String destinationType) throws JMSException {
-
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) {
- return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
- } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
- return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode);
- } else {
- log.debug("JMS destination type not given or invalid, was '" + destinationType +
- "'. Taking the default value as queue");
- return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
- }
- }
-
- public static Destination createDestination(Session session, String destName,
- String destinationType) throws JMSException {
-
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
- return session.createQueue(destName);
- } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
- return session.createTopic(destName);
- } else {
- log.debug("JMS destination type not given or invalid, was '" + destinationType +
- "'. Taking the default value as queue");
- return session.createQueue(destName);
- }
- }
-
- public static void createDestination(ConnectionFactory conFactory,
- String destinationJNDIName, String destinationType) throws JMSException {
-
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
- JMSUtils.createJMSQueue(
- ((QueueConnectionFactory) conFactory).createQueueConnection(),
- destinationJNDIName);
- } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
- JMSUtils.createJMSTopic(
- ((TopicConnectionFactory) conFactory).createTopicConnection(),
- destinationJNDIName);
- }
- }
- public static MessageConsumer createConsumer(Session session, Destination dest)
- throws JMSException {
-
- if (dest instanceof Queue) {
- return ((QueueSession) session).createReceiver((Queue) dest);
- } else {
- return ((TopicSession) session).createSubscriber((Topic) dest);
- }
- }
-
+ /**
+ * Create a MessageConsumer for the given Destination
+ * @param session JMS Session to use
+ * @param dest Destination for which the Consumer is to be created
+ * @param messageSelector the message selector to be used if any
+ * @return a MessageConsumer for the specified Destination
+ * @throws JMSException
+ */
public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector)
throws JMSException {
@@ -728,6 +480,13 @@
}
}
+ /**
+ * Create a temp queue or topic for synchronous receipt of responses, when a reply destination
+ * is not specified
+ * @param session the JMS Session to use
+ * @return a temporary Queue or Topic, depending on the session
+ * @throws JMSException
+ */
public static Destination createTemporaryDestination(Session session) throws JMSException {
if (session instanceof QueueSession) {
@@ -737,6 +496,11 @@
}
}
+ /**
+ * Return the body length in bytes for a bytes message
+ * @param bMsg the JMS BytesMessage
+ * @return length of body in bytes
+ */
public static long getBodyLength(BytesMessage bMsg) {
try {
Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS);
@@ -762,7 +526,13 @@
} catch (JMSException ignore) {}
return length;
}
-
+
+ /**
+ * Get the length of the message in bytes
+ * @param message
+ * @return message size (or approximation) in bytes
+ * @throws JMSException
+ */
public static long getMessageSize(Message message) throws JMSException {
if (message instanceof BytesMessage) {
return JMSUtils.getBodyLength((BytesMessage) message);
@@ -799,4 +569,525 @@
}
}
}
+
+ /**
+ * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory
+ * @param jcf
+ * @param service
+ * @param workerPool
+ * @return
+ */
+ public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf,
+ AxisService service, WorkerPool workerPool) {
+
+ String name = service.getName();
+ Map<String, String> svc = getServiceStringParameters(service.getParameters());
+ Map<String, String> cf = jcf.getParameters();
+
+ ServiceTaskManager stm = new ServiceTaskManager();
+
+ stm.setServiceName(name);
+ stm.addJmsProperties(cf);
+ stm.addJmsProperties(svc);
+
+ stm.setConnFactoryJNDIName(
+ getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf));
+ String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf);
+ if (destName == null) {
+ destName = service.getName();
+ }
+ stm.setDestinationJNDIName(destName);
+ stm.setDestinationType(getDestinationType(svc, cf));
+
+ stm.setJmsSpec11(
+ getJMSSpecVersion(svc, cf));
+ stm.setTransactionality(
+ getTransactionality(svc, cf));
+ stm.setCacheUserTransaction(
+ getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf));
+ stm.setUserTransactionJNDIName(
+ getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf));
+ stm.setSessionTransacted(
+ getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf));
+ stm.setSessionAckMode(
+ getSessionAck(svc, cf));
+ stm.setMessageSelector(
+ getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf));
+ stm.setSubscriptionDurable(
+ getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf));
+ stm.setDurableSubscriberName(
+ getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf));
+
+ stm.setCacheLevel(
+ getCacheLevel(svc, cf));
+ stm.setPubSubNoLocal(
+ getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf));
+
+ Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf);
+ if (value != null) {
+ stm.setReceiveTimeout(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf);
+ if (value != null) {
+ stm.setConcurrentConsumers(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf);
+ if (value != null) {
+ stm.setMaxConcurrentConsumers(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf);
+ if (value != null) {
+ stm.setIdleTaskExecutionLimit(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf);
+ if (value != null) {
+ stm.setMaxMessagesPerTask(value);
+ }
+
+ value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf);
+ if (value != null) {
+ stm.setInitialReconnectDuration(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf);
+ if (value != null) {
+ stm.setMaxReconnectDuration(value);
+ }
+ Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf);
+ if (dValue != null) {
+ stm.setReconnectionProgressionFactor(dValue);
+ }
+
+ stm.setWorkerPool(workerPool);
+
+ // remove processed properties from property bag
+ stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER);
+ stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY);
+ stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN);
+ stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED);
+ stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR);
+ stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE);
+ stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL);
+ stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL);
+ stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT);
+ stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS);
+ stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS);
+ stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT);
+ stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR);
+
+ return stm;
+ }
+
+ private static Map<String, String> getServiceStringParameters(List list) {
+
+ Map<String, String> map = new HashMap<String, String>();
+ for (Object o : list) {
+ Parameter p = (Parameter) o;
+ if (p.getValue() instanceof String) {
+ map.put(p.getName(), (String) p.getValue());
+ }
+ }
+ return map;
+ }
+
+ private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value == null) {
+ throw new AxisJMSException("Service/connection factory property : " + key);
+ }
+ return value;
+ }
+
+ private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ return value;
+ }
+
+ private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value == null) {
+ return null;
+ } else {
+ return Boolean.valueOf(value);
+ }
+ }
+
+ private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value != null) {
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ throw new AxisJMSException("Invalid value : " + value + " for " + key);
+ }
+ }
+ return null;
+ }
+
+ private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value != null) {
+ try {
+ return Double.parseDouble(value);
+ } catch (NumberFormatException e) {
+ throw new AxisJMSException("Invalid value : " + value + " for " + key);
+ }
+ }
+ return null;
+ }
+
+ private static int getTransactionality(Map svcMap, Map cfMap) {
+
+ String key = BaseConstants.PARAM_TRANSACTIONALITY;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (val == null) {
+ return BaseConstants.TRANSACTION_NONE;
+
+ } else {
+ if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) {
+ return BaseConstants.TRANSACTION_JTA;
+ } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) {
+ return BaseConstants.TRANSACTION_LOCAL;
+ } else {
+ throw new AxisJMSException("Invalid option : " + val + " for parameter : " +
+ BaseConstants.STR_TRANSACTION_JTA);
+ }
+ }
+ }
+
+ private static int getDestinationType(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_DEST_TYPE;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) {
+ return JMSConstants.TOPIC;
+ }
+ return JMSConstants.QUEUE;
+ }
+
+ private static int getSessionAck(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_SESSION_ACK;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) {
+ return Session.AUTO_ACKNOWLEDGE;
+ } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) {
+ return Session.CLIENT_ACKNOWLEDGE;
+ } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){
+ return Session.DUPS_OK_ACKNOWLEDGE;
+ } else if ("SESSION_TRANSACTED".equals(val)) {
+ return 0; //Session.SESSION_TRANSACTED;
+ } else {
+ try {
+ return Integer.parseInt(val);
+ } catch (NumberFormatException ignore) {
+ throw new AxisJMSException("Invalid session acknowledgement mode : " + val);
+ }
+ }
+ }
+
+ private static int getCacheLevel(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_CACHE_LEVEL;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if ("none".equalsIgnoreCase(val)) {
+ return JMSConstants.CACHE_NONE;
+ } else if ("connection".equalsIgnoreCase(val)) {
+ return JMSConstants.CACHE_CONNECTION;
+ } else if ("session".equals(val)){
+ return JMSConstants.CACHE_SESSION;
+ } else if ("consumer".equals(val)) {
+ return JMSConstants.CACHE_CONSUMER;
+ } else if (val != null) {
+ throw new AxisJMSException("Invalid cache level : " + val);
+ }
+ return JMSConstants.CACHE_AUTO;
+ }
+
+ private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_JMS_SPEC_VER;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (val == null || "1.1".equals(val)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * This is a JMS spec independent method to create a Connection. Please be cautious when
+ * making any changes
+ *
+ * @param conFac the ConnectionFactory to use
+ * @param user optional user name
+ * @param pass optional password
+ * @param jmsSpec11 should we use JMS 1.1 API ?
+ * @param isQueue is this to deal with a Queue?
+ * @return a JMS Connection as requested
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static Connection createConnection(ConnectionFactory conFac,
+ String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException {
+
+ Connection connection = null;
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") +
+ "Connection using credentials : (" + user + "/" + pass + ")");
+ }
+
+ if (jmsSpec11 || isQueue == null) {
+ if (user != null && pass != null) {
+ connection = conFac.createConnection(user, pass);
+ } else {
+ connection = conFac.createConnection();
+ }
+
+ } else {
+ QueueConnectionFactory qConFac = null;
+ TopicConnectionFactory tConFac = null;
+ if (isQueue) {
+ tConFac = (TopicConnectionFactory) conFac;
+ } else {
+ qConFac = (QueueConnectionFactory) conFac;
+ }
+
+ if (user != null && pass != null) {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection(user, pass);
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection(user, pass);
+ }
+ } else {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection();
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection();
+ }
+ }
+ }
+ return connection;
+ }
+
+ /**
+ * This is a JMS spec independent method to create a Session. Please be cautious when
+ * making any changes
+ *
+ * @param connection the JMS Connection
+ * @param transacted should the session be transacted?
+ * @param ackMode the ACK mode for the session
+ * @param jmsSpec11 should we use the JMS 1.1 API?
+ * @param isQueue is this Session to deal with a Queue?
+ * @return a Session created for the given information
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static Session createSession(Connection connection, boolean transacted, int ackMode,
+ boolean jmsSpec11, Boolean isQueue) throws JMSException {
+
+ if (jmsSpec11 || isQueue == null) {
+ return connection.createSession(transacted, ackMode);
+
+ } else {
+ if (isQueue) {
+ return ((QueueConnection) connection).createQueueSession(transacted, ackMode);
+ } else {
+ return ((TopicConnection) connection).createTopicSession(transacted, ackMode);
+ }
+ }
+ }
+
+ /**
+ * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when
+ * making any changes
+ *
+ * @param session JMS session
+ * @param destination the Destination
+ * @param isQueue is the Destination a queue?
+ * @param subscriberName optional client name to use for a durable subscription to a topic
+ * @param messageSelector optional message selector
+ * @param pubSubNoLocal should we receive messages sent by us during pub-sub?
+ * @param isDurable is this a durable topic subscription?
+ * @param jmsSpec11 should we use JMS 1.1 API ?
+ * @return a MessageConsumer to receive messages
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static MessageConsumer createConsumer(
+ Session session, Destination destination, Boolean isQueue,
+ String subscriberName, String messageSelector, boolean pubSubNoLocal,
+ boolean isDurable, boolean jmsSpec11) throws JMSException {
+
+ if (jmsSpec11 || isQueue == null) {
+ if (isDurable) {
+ return session.createDurableSubscriber(
+ (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
+ } else {
+ return session.createConsumer(destination, messageSelector, pubSubNoLocal);
+ }
+ } else {
+ if (isQueue) {
+ return ((QueueSession) session).createReceiver((Queue) destination, messageSelector);
+ } else {
+ if (isDurable) {
+ return ((TopicSession) session).createDurableSubscriber(
+ (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
+ } else {
+ return ((TopicSession) session).createSubscriber(
+ (Topic) destination, messageSelector, pubSubNoLocal);
+ }
+ }
+ }
+ }
+
+ /**
+ * This is a JMS spec independent method to create a MessageProducer. Please be cautious when
+ * making any changes
+ *
+ * @param session JMS session
+ * @param destination the Destination
+ * @param isQueue is the Destination a queue?
+ * @param jmsSpec11 should we use JMS 1.1 API ?
+ * @return a MessageProducer to send messages to the given Destination
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static MessageProducer createProducer(
+ Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException {
+
+ if (jmsSpec11 || isQueue == null) {
+ return session.createProducer(destination);
+ } else {
+ if (isQueue) {
+ return ((QueueSession) session).createSender((Queue) destination);
+ } else {
+ return ((TopicSession) session).createPublisher((Topic) destination);
+ }
+ }
+ }
+
+ /**
+ * Create a one time MessageProducer for the given JMS OutTransport information
+ * For simplicity and best compatibility, this method uses only JMS 1.0.2b API.
+ * Please be cautious when making any changes
+ *
+ * @param jmsOut the JMS OutTransport information (contains all properties)
+ * @return a JMSSender based on one-time use resources
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut)
+ throws JMSException {
+
+ // digest the targetAddress and locate CF from the EPR
+ jmsOut.loadConnectionFactoryFromProperies();
+
+ // create a one time connection and session to be used
+ Hashtable<String,String> jmsProps = jmsOut.getProperties();
+ String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null;
+ String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null;
+
+ QueueConnectionFactory qConFac = null;
+ TopicConnectionFactory tConFac = null;
+
+ int destType = -1;
+ if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
+ destType = JMSConstants.QUEUE;
+ qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
+
+ } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
+ destType = JMSConstants.TOPIC;
+ tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
+ }
+
+ Connection connection = null;
+ if (user != null && pass != null) {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection(user, pass);
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection(user, pass);
+ }
+ } else {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection();
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection();
+ }
+ }
+
+ if (connection == null && jmsOut.getJmsConnectionFactory() != null) {
+ connection = jmsOut.getJmsConnectionFactory().getConnection();
+ }
+
+ Session session = null;
+ MessageProducer producer = null;
+ Destination destination = jmsOut.getDestination();
+
+ if (destType == JMSConstants.QUEUE) {
+ session = ((QueueConnection) connection).
+ createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = ((QueueSession) session).createSender((Queue) destination);
+ } else {
+ session = ((TopicConnection) connection).
+ createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = ((TopicSession) session).createPublisher((Topic) destination);
+ }
+
+ return new JMSMessageSender(connection, session, producer,
+ destination, (jmsOut.getJmsConnectionFactory() == null ?
+ JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false,
+ destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * Return a String representation of the destination type
+ * @param destType the destination type indicator int
+ * @return a descriptive String
+ */
+ public static String getDestinationTypeAsString(int destType) {
+ if (destType == JMSConstants.QUEUE) {
+ return "Queue";
+ } else if (destType == JMSConstants.TOPIC) {
+ return "Topic";
+ } else {
+ return "Generic";
+ }
+ }
}