You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2008/06/22 09:00:59 UTC
svn commit: r670316 -
/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java
Author: ruwan
Date: Sun Jun 22 00:00:58 2008
New Revision: 670316
URL: http://svn.apache.org/viewvc?rev=670316&view=rev
Log:
Code reformatting, introducing Java 5 features and improved java docs.
No functional changes
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java?rev=670316&r1=670315&r2=670316&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java Sun Jun 22 00:00:58 2008
@@ -15,44 +15,27 @@
*/
package org.apache.synapse.transport.jms;
-import org.apache.axiom.om.OMOutputFormat;
-import org.apache.axiom.om.OMText;
import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.util.StAXUtils;
-import org.apache.axiom.om.impl.builder.StAXBuilder;
-import org.apache.axiom.om.impl.builder.StAXOMBuilder;
-import org.apache.axiom.om.impl.llom.OMTextImpl;
-import org.apache.axiom.soap.SOAP11Constants;
-import org.apache.axiom.soap.SOAP12Constants;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
-import org.apache.axiom.attachments.ByteArrayDataSource;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
import org.apache.axis2.builder.BuilderUtil;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.ParameterIncludeImpl;
-import org.apache.axis2.transport.http.HTTPTransportUtils;
-import org.apache.synapse.transport.base.BaseUtils;
-import org.apache.synapse.transport.base.BaseConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.base.BaseConstants;
+import org.apache.synapse.transport.base.BaseUtils;
import javax.jms.*;
import javax.jms.Queue;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.namespace.QName;
-import javax.activation.DataHandler;
import javax.naming.Context;
-import java.io.*;
-import java.util.*;
-import java.nio.ByteBuffer;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
+import java.util.*;
/**
* Miscallaneous methods used for the JMS transport
@@ -78,9 +61,12 @@
* @return the JMS Destination name of the created Queue
* @throws JMSException on error
*/
- public static String createJMSQueue(Connection con, String destinationJNDIName) throws JMSException {
+ public static String createJMSQueue(Connection con, String destinationJNDIName)
+ throws JMSException {
+
try {
- QueueSession session = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSession session
+ = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(destinationJNDIName);
log.info("JMS Queue with JNDI name : " + destinationJNDIName + " created");
return queue.getQueueName();
@@ -101,9 +87,12 @@
* @return the JMS Destination name of the created Topic
* @throws JMSException on error
*/
- public static String createJMSTopic(Connection con, String destinationJNDIName) throws JMSException {
+ public static String createJMSTopic(Connection con, String destinationJNDIName)
+ throws JMSException {
+
try {
- TopicSession session = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSession session
+ = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(destinationJNDIName);
log.info("JMS Topic with JNDI name : " + destinationJNDIName + " created");
return topic.getTopicName();
@@ -127,8 +116,8 @@
} else {
List transports = service.getExposedTransports();
- for (int i = 0; i < transports.size(); i++) {
- if (JMSListener.TRANSPORT_NAME.equals(transports.get(i))) {
+ for (Object transport : transports) {
+ if (JMSListener.TRANSPORT_NAME.equals(transport)) {
return true;
}
}
@@ -161,7 +150,8 @@
Parameter destTypeParam = service.getParameter(JMSConstants.DEST_PARAM_TYPE);
if (destTypeParam != null) {
String paramValue = (String) destTypeParam.getValue();
- if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
+ if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
+ JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
return paramValue;
} else {
handleException("Invalid destinaton type value " + paramValue);
@@ -190,7 +180,7 @@
if (sep != -1) {
h.put(token.substring(0, sep), token.substring(sep + 1));
} else {
- continue; // ignore, what else can we do?
+ // ignore, what else can we do?
}
}
}
@@ -268,37 +258,36 @@
jmsConFactory.getName(), axisFault);
}
- Iterator params = pi.getParameters().iterator();
- while (params.hasNext()) {
+ for (Object o : pi.getParameters()) {
- Parameter p = (Parameter) params.next();
+ Parameter p = (Parameter) o;
if (JMSConstants.CONFAC_TYPE.equals(p.getName())) {
String connectionFactoryType = (String) p.getValue();
jmsConFactory.setConnectionFactoryType(connectionFactoryType);
-
+
} else if (JMSConstants.RECONNECT_TIMEOUT.equals(p.getName())) {
- String strTimeout = (String) p.getValue();
- int reconnectTimeoutSeconds = Integer.parseInt(strTimeout);
- long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000;
- jmsConFactory.setReconnectTimeout(reconnectTimeoutMillis);
-
+ String strTimeout = (String) p.getValue();
+ int reconnectTimeoutSeconds = Integer.parseInt(strTimeout);
+ long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000;
+ jmsConFactory.setReconnectTimeout(reconnectTimeoutMillis);
+
} else if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) {
jmsConFactory.addJNDIContextProperty(
- Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue());
+ Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue());
} else if (Context.PROVIDER_URL.equals(p.getName())) {
jmsConFactory.addJNDIContextProperty(
- Context.PROVIDER_URL, (String) p.getValue());
+ Context.PROVIDER_URL, (String) p.getValue());
} else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) {
jmsConFactory.addJNDIContextProperty(
- Context.SECURITY_PRINCIPAL, (String) p.getValue());
+ Context.SECURITY_PRINCIPAL, (String) p.getValue());
} else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) {
jmsConFactory.addJNDIContextProperty(
- Context.SECURITY_CREDENTIALS, (String) p.getValue());
+ Context.SECURITY_CREDENTIALS, (String) p.getValue());
} else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) {
jmsConFactory.setConnFactoryJNDIName((String) p.getValue());
jmsConFactory.addJNDIContextProperty(
- JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue());
+ JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue());
}
}
}
@@ -375,6 +364,7 @@
if (log.isDebugEnabled()) {
try {
+ assert replyDestination != null;
log.debug("Expecting a response to JMS Destination : " +
(replyDestination instanceof Queue ?
((Queue) replyDestination).getQueueName() :
@@ -388,13 +378,15 @@
* When trying to send a message to a destination, if it does not exist, try to create it
*
* @param destination the JMS destination to send messages
+ * @param destinationType type of the destination (can be a queue or a topic)
* @param targetAddress the target JMS EPR to find the Destination to be created if required
* @param session the JMS session to use
* @return the JMS Destination where messages could be posted
* @throws AxisFault if the target Destination does not exist and cannot be created
*/
- public static Destination createDestinationIfRequired(Destination destination, String destinationType,
- String targetAddress, Session session) throws AxisFault {
+ public static Destination createDestinationIfRequired(Destination destination,
+ String destinationType, String targetAddress, Session session) throws AxisFault {
+
if (destination == null) {
if (targetAddress != null) {
String name = JMSUtils.getDestination(targetAddress);
@@ -416,9 +408,19 @@
/**
* If reply destination does not exist, try to create it
+ *
+ * @param destination the destination queue or topic
+ * @param replyDestinationName name of the reply destination queue or topic
+ * @param destinationType type of the destination (can be queue or topic)
+ * @param targetAddress target address of the queue or topic
+ * @param session JMS session with the message to be sent
+ * @return destination created if the destination is null or the destination otherwise
+ * @throws org.apache.axis2.AxisFault in case of an error in creating the destination
*/
public static Destination createReplyDestinationIfRequired(Destination destination,
- String replyDestinationName, String destinationType, String targetAddress, Session session) throws AxisFault {
+ String replyDestinationName, String destinationType, String targetAddress, Session session)
+ throws AxisFault {
+
if (destination == null) {
if (targetAddress != null) {
if (log.isDebugEnabled()) {
@@ -428,7 +430,8 @@
try {
destination = createDestination(session, replyDestinationName, destinationType);
} catch (JMSException e) {
- handleException("Error creating reply destination : " + replyDestinationName, e);
+ handleException("Error creating reply destination : "
+ + replyDestinationName, e);
}
} else {
handleException("Cannot send reply to null reply JMS Destination");
@@ -439,8 +442,10 @@
/**
* Send the given message to the Destination using the given session
+ *
* @param session the session to use to send
* @param destination the Destination
+ * @param destinationType type of the destination (can be a queue or a topic)
* @param message the JMS Message
* @throws AxisFault on error
*/
@@ -458,7 +463,7 @@
((TopicPublisher) producer).publish(message);
} else {
producer = ((QueueSession) session).createSender((Queue) destination);
- ((QueueSender) producer).send(message);
+ producer.send(message);
}
if (log.isDebugEnabled()) {
@@ -494,19 +499,18 @@
if (headerMap == null) {
return;
}
-
- Iterator iter = headerMap.keySet().iterator();
- while (iter.hasNext()) {
- String name = (String) iter.next();
+ for (Object headerName : headerMap.keySet()) {
+
+ String name = (String) headerName;
if (JMSConstants.JMS_COORELATION_ID.equals(name)) {
- message.setJMSCorrelationID((String) headerMap.get(JMSConstants.JMS_COORELATION_ID));
- }
- else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) {
+ message.setJMSCorrelationID(
+ (String) headerMap.get(JMSConstants.JMS_COORELATION_ID));
+ } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) {
Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE);
if (o instanceof Integer) {
- message.setJMSDeliveryMode(((Integer) o).intValue());
+ message.setJMSDeliveryMode((Integer) o);
} else if (o instanceof String) {
try {
message.setJMSDeliveryMode(Integer.parseInt((String) o));
@@ -516,39 +520,33 @@
} else {
log.warn("Invalid delivery mode ignored : " + o);
}
- }
- else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
+ } else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
message.setJMSExpiration(
- Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
- }
- else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
+ Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
+ } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID));
- }
- else if (JMSConstants.JMS_PRIORITY.equals(name)) {
+ } else if (JMSConstants.JMS_PRIORITY.equals(name)) {
message.setJMSPriority(
- Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
- }
- else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
+ Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
+ } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
message.setJMSTimestamp(
- Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
- }
- else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
+ Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
+ } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE));
- }
- else {
+ } else {
Object value = headerMap.get(name);
if (value instanceof String) {
message.setStringProperty(name, (String) value);
} else if (value instanceof Boolean) {
- message.setBooleanProperty(name, ((Boolean) value).booleanValue());
+ message.setBooleanProperty(name, (Boolean) value);
} else if (value instanceof Integer) {
- message.setIntProperty(name, ((Integer) value).intValue());
+ message.setIntProperty(name, (Integer) value);
} else if (value instanceof Long) {
- message.setLongProperty(name, ((Long) value).longValue());
+ message.setLongProperty(name, (Long) value);
} else if (value instanceof Double) {
- message.setDoubleProperty(name, ((Double) value).doubleValue());
+ message.setDoubleProperty(name, (Double) value);
} else if (value instanceof Float) {
- message.setFloatProperty(name, ((Float) value).floatValue());
+ message.setFloatProperty(name, (Float) value);
}
}
}
@@ -572,9 +570,9 @@
* @param message the JMS message
* @return a Map of the transport headers
*/
- public static Map getTransportHeaders(Message message) {
+ public static Map<String, Object> getTransportHeaders(Message message) {
// create a Map to hold transport headers
- Map map = new HashMap();
+ Map<String, Object> map = new HashMap<String, Object>();
// correlation ID
try {
@@ -656,24 +654,23 @@
continue;
} catch (JMSException ignore) {}
try {
- map.put(headerName, Boolean.valueOf(message.getBooleanProperty(headerName)));
+ map.put(headerName, message.getBooleanProperty(headerName));
continue;
} catch (JMSException ignore) {}
try {
- map.put(headerName, new Integer(message.getIntProperty(headerName)));
+ map.put(headerName, message.getIntProperty(headerName));
continue;
} catch (JMSException ignore) {}
try {
- map.put(headerName, new Long(message.getLongProperty(headerName)));
+ map.put(headerName, message.getLongProperty(headerName));
continue;
} catch (JMSException ignore) {}
try {
- map.put(headerName, new Double(message.getDoubleProperty(headerName)));
+ map.put(headerName, message.getDoubleProperty(headerName));
continue;
} catch (JMSException ignore) {}
try {
- map.put(headerName, new Float(message.getFloatProperty(headerName)));
- continue;
+ map.put(headerName, message.getFloatProperty(headerName));
} catch (JMSException ignore) {}
}
}
@@ -720,8 +717,8 @@
}
// ----------- JMS 1.0.2b compatibility methods -------------
- public static Connection createConnection(
- ConnectionFactory conFactory, String user, String pass, String destinationType) throws JMSException {
+ public static Connection createConnection(ConnectionFactory conFactory, String user,
+ String pass, String destinationType) throws JMSException {
if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) {
if (user != null && pass != null) {
@@ -750,21 +747,23 @@
} else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode);
} else {
- log.debug("JMS destination type not given or invalid. default queue. was " + destinationType);
+ log.debug("JMS destination type not given or invalid, was '" + destinationType +
+ "'. Taking the default value as queue");
return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
}
}
- public static Destination createDestination(Session session, String destName, String destinationType)
- throws JMSException {
+ public static Destination createDestination(Session session, String destName,
+ String destinationType) throws JMSException {
if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
- return ((QueueSession) session).createQueue(destName);
+ return session.createQueue(destName);
} else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
- return ((TopicSession) session).createTopic(destName);
+ return session.createTopic(destName);
} else {
- log.debug("JMS destination type not given or invalid. default queue. was " + destinationType);
- return ((QueueSession) session).createQueue(destName);
+ log.debug("JMS destination type not given or invalid, was '" + destinationType +
+ "'. Taking the default value as queue");
+ return session.createQueue(destName);
}
}
@@ -794,9 +793,9 @@
public static Destination createTemporaryDestination(Session session) throws JMSException {
if (session instanceof QueueSession) {
- return ((QueueSession) session).createTemporaryQueue();
+ return session.createTemporaryQueue();
} else {
- return ((TopicSession) session).createTemporaryTopic();
+ return session.createTemporaryTopic();
}
}
@@ -820,11 +819,9 @@
bMsg.reset();
for (int bytesRead = bMsg.readBytes(buffer); bytesRead != -1;
bytesRead = bMsg.readBytes(buffer)) {
- if (bytesRead != -1) {
length += bytesRead;
- }
}
- } catch (JMSException e) {}
+ } catch (JMSException ignore) {}
return length;
}
}