You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/07 11:38:51 UTC
[2/5] camel git commit: CAMEL-9116: camel-sjms should use same
binding to/from JMS as camel-jms does.
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index 0a93f16..f3224b6 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -16,408 +16,272 @@
*/
package org.apache.camel.component.sjms.jms;
-import java.io.File;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
-import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MapMessage;
import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
-import org.apache.camel.component.sjms.SjmsConstants;
-import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.impl.DefaultMessage;
-import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
/**
* Utility class for {@link javax.jms.Message}.
*/
-public final class JmsMessageHelper implements JmsConstants {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmsMessageHelper.class);
+public final class JmsMessageHelper {
private JmsMessageHelper() {
}
- public static Exchange createExchange(Message message, Endpoint endpoint) {
- return createExchange(message, endpoint, null);
- }
-
/**
- * Creates an Exchange from a JMS Message.
- * @param message The JMS message.
- * @param endpoint The Endpoint to use to create the Exchange object.
- * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
- * format keys in a JMS 1.1 compliant manner. If null the
- * {@link DefaultJmsKeyFormatStrategy} will be used.
- * @return Populated Exchange.
+ * Removes the property from the JMS message.
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to remove
+ * @return the old value of the property or <tt>null</tt> if not exists
+ * @throws javax.jms.JMSException can be thrown
*/
- public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) {
- Exchange exchange = endpoint.createExchange();
- KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null)
- ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy;
- return populateExchange(message, exchange, false, initialisedKeyFormatStrategy);
- }
-
- @SuppressWarnings("unchecked")
- public static Exchange populateExchange(Message message, Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) {
- try {
- setJmsMessageHeaders(message, exchange, out, keyFormatStrategy);
- if (message != null) {
- // convert to JMS Message of the given type
+ public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException {
+ // check if the property exists
+ if (!jmsMessage.propertyExists(name)) {
+ return null;
+ }
- DefaultMessage bodyMessage;
- if (out) {
- bodyMessage = (DefaultMessage) exchange.getOut();
- } else {
- bodyMessage = (DefaultMessage) exchange.getIn();
- }
- switch (JmsMessageHelper.discoverJmsMessageType(message)) {
- case Bytes:
- BytesMessage bytesMessage = (BytesMessage) message;
- if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
- LOG.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
- return null;
- }
- byte[] result = new byte[(int) bytesMessage.getBodyLength()];
- bytesMessage.readBytes(result);
- bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Bytes);
- bodyMessage.setBody(result);
- break;
- case Map:
- Map<String, Object> body = new HashMap<String, Object>();
- MapMessage mapMessage = (MapMessage) message;
- Enumeration<String> names = mapMessage.getMapNames();
- while (names.hasMoreElements()) {
- String key = names.nextElement();
- Object value = mapMessage.getObject(key);
- body.put(key, value);
- }
- bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Map);
- bodyMessage.setBody(body);
- break;
- case Object:
- ObjectMessage objMsg = (ObjectMessage) message;
- bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Object);
- bodyMessage.setBody(objMsg.getObject());
- break;
- case Text:
- TextMessage textMsg = (TextMessage) message;
- bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Text);
- bodyMessage.setBody(textMsg.getText());
- break;
- case Stream:
- StreamMessage streamMessage = (StreamMessage) message;
- List<Object> list = new ArrayList<Object>();
- Object obj;
- while ((obj = streamMessage.readObject()) != null) {
- list.add(obj);
- }
- bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Stream);
- bodyMessage.setBody(list);
- break;
- case Message:
- default:
- // Do nothing. Only set the headers for an empty message
- bodyMessage.setBody(message);
- break;
- }
+ Object answer = null;
+
+ // store the properties we want to keep in a temporary map
+ // as the JMS API is a bit strict as we are not allowed to
+ // clear a single property, but must clear them all and redo
+ // the properties
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ Enumeration<?> en = jmsMessage.getPropertyNames();
+ while (en.hasMoreElements()) {
+ String key = (String) en.nextElement();
+ if (name.equals(key)) {
+ answer = key;
+ } else {
+ map.put(key, getProperty(jmsMessage, key));
}
- } catch (Exception e) {
- exchange.setException(e);
}
- return exchange;
- }
-
- public static Message createMessage(Exchange exchange, Session session, SjmsEndpoint endpoint) throws Exception {
- Message answer;
- Object body;
- Map<String, Object> bodyHeaders;
- if (exchange.getOut().getBody() != null) {
- body = exchange.getOut().getBody();
- bodyHeaders = new HashMap<String, Object>(exchange.getOut().getHeaders());
- } else {
- body = exchange.getIn().getBody();
- bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders());
+ // redo the properties to keep
+ jmsMessage.clearProperties();
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ jmsMessage.setObjectProperty(entry.getKey(), entry.getValue());
}
- answer = createMessage(exchange, session, body, bodyHeaders, endpoint);
return answer;
}
- public static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception {
- return createMessage(exchange, session, payload, messageHeaders, endpoint.isAllowNullBody(),
- endpoint.getSjmsHeaderFilterStrategy(), endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter());
- }
-
- private static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, boolean allowNullBody,
- HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception {
- Message answer = null;
- JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload);
-
- switch (messageType) {
- case Bytes:
- BytesMessage bytesMessage = session.createBytesMessage();
- byte[] bytesToWrite = typeConverter.convertTo(byte[].class, payload);
- bytesMessage.writeBytes(bytesToWrite);
- answer = bytesMessage;
- break;
- case Map:
- MapMessage mapMessage = session.createMapMessage();
- Map objMap = (Map) payload;
- for (final Map.Entry entry : (Set<Map.Entry>)objMap.entrySet()) {
- mapMessage.setObject(entry.getKey().toString(), entry.getValue());
- }
- answer = mapMessage;
- break;
- case Object:
- ObjectMessage objectMessage = session.createObjectMessage();
- objectMessage.setObject((Serializable) payload);
- answer = objectMessage;
- break;
- case Text:
- TextMessage textMessage = session.createTextMessage();
- String convertedText = typeConverter.convertTo(String.class, payload);
- textMessage.setText(convertedText);
- answer = textMessage;
- break;
- case Stream:
- StreamMessage streamMessage = session.createStreamMessage();
- Collection collection = (Collection)payload;
- for (final Object obj : collection) {
- streamMessage.writeObject(obj);
- }
- answer = streamMessage;
- break;
- case Message:
- if (allowNullBody && payload == null) {
- answer = session.createMessage();
- } else if (payload != null) {
- throw new JMSException("Unsupported message body type " + ObjectHelper.classCanonicalName(payload));
- } else {
- throw new JMSException("Null body is not allowed");
+ /**
+ * Tests whether a given property with the name exists
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to test if exists
+ * @return <tt>true</tt> if the property exists, <tt>false</tt> if not.
+ * @throws JMSException can be thrown
+ */
+ public static boolean hasProperty(Message jmsMessage, String name) throws JMSException {
+ Enumeration<?> en = jmsMessage.getPropertyNames();
+ while (en.hasMoreElements()) {
+ String key = (String) en.nextElement();
+ if (name.equals(key)) {
+ return true;
}
- break;
- default:
- break;
}
-
- appendJmsProperties(answer, exchange, exchange.getIn(), headerFilterStrategy, keyFormatStrategy);
- return answer;
+ return false;
}
/**
- * Appends the JMS headers from the Camel {@link Message}
+ * Gets a JMS property
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to get
+ * @return the property value, or <tt>null</tt> if does not exists
+ * @throws JMSException can be thrown
*/
- private static void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in,
- HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException {
- Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
- for (Map.Entry<String, Object> entry : entries) {
- String headerName = entry.getKey();
- Object headerValue = entry.getValue();
- appendJmsProperty(jmsMessage, exchange, in, headerName, headerValue, headerFilterStrategy, keyFormatStrategy);
+ public static Object getProperty(Message jmsMessage, String name) throws JMSException {
+ Object value = jmsMessage.getObjectProperty(name);
+ if (value == null) {
+ value = jmsMessage.getStringProperty(name);
}
+ return value;
}
- private static void appendJmsProperty(Message jmsMessage, Exchange exchange, org.apache.camel.Message in,
- String headerName, Object headerValue,
- HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException {
- if (isStandardJMSHeader(headerName)) {
- if (headerName.equals("JMSCorrelationID")) {
- jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
- } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
- if (headerValue instanceof String) {
- // if the value is a String we must normalize it first, and must include the prefix
- // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
- headerValue = normalizeDestinationName((String) headerValue, true);
- }
- Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
- JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
- } else if (headerName.equals("JMSType")) {
- jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
- } else if (headerName.equals("JMSPriority")) {
- jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
- } else if (headerName.equals("JMSDeliveryMode")) {
- JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
- } else if (headerName.equals("JMSExpiration")) {
- jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
- } else {
- // The following properties are set by the MessageProducer:
- // JMSDestination
- // The following are set on the underlying JMS provider:
- // JMSMessageID, JMSTimestamp, JMSRedelivered
- // log at trace level to not spam log
- LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
- }
- } else if (shouldOutputHeader(in, headerName, headerValue, exchange, headerFilterStrategy)) {
- // only primitive headers and strings is allowed as properties
- // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
- Object value = getValidJMSHeaderValue(headerName, headerValue);
- if (value != null) {
- // must encode to safe JMS header name before setting property on jmsMessage
- String key = keyFormatStrategy.encodeKey(headerName);
- // set the property
- JmsMessageHelper.setProperty(jmsMessage, key, value);
- } else if (LOG.isDebugEnabled()) {
- // okay the value is not a primitive or string so we cannot sent it over the wire
- LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}",
- new Object[]{headerName, headerValue.getClass().getName(), headerValue});
- }
+ /**
+ * Sets the property on the given JMS message.
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to set
+ * @param value the value
+ * @throws JMSException can be thrown
+ */
+ public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
+ if (value == null) {
+ return;
+ }
+ if (value instanceof Byte) {
+ jmsMessage.setByteProperty(name, (Byte) value);
+ } else if (value instanceof Boolean) {
+ jmsMessage.setBooleanProperty(name, (Boolean) value);
+ } else if (value instanceof Double) {
+ jmsMessage.setDoubleProperty(name, (Double) value);
+ } else if (value instanceof Float) {
+ jmsMessage.setFloatProperty(name, (Float) value);
+ } else if (value instanceof Integer) {
+ jmsMessage.setIntProperty(name, (Integer) value);
+ } else if (value instanceof Long) {
+ jmsMessage.setLongProperty(name, (Long) value);
+ } else if (value instanceof Short) {
+ jmsMessage.setShortProperty(name, (Short) value);
+ } else if (value instanceof String) {
+ jmsMessage.setStringProperty(name, (String) value);
+ } else {
+ // fallback to Object
+ jmsMessage.setObjectProperty(name, value);
}
}
/**
- * Is the given header a standard JMS header
- * @param headerName the header name
- * @return <tt>true</tt> if its a standard JMS header
+ * Sets the correlation id on the JMS message.
+ * <p/>
+ * Will ignore exception thrown
+ *
+ * @param message the JMS message
+ * @param correlationId the correlation id
*/
- protected static boolean isStandardJMSHeader(String headerName) {
- if (!headerName.startsWith("JMS")) {
- return false;
- }
- if (headerName.startsWith("JMSX")) {
- return false;
+ public static void setCorrelationId(Message message, String correlationId) {
+ try {
+ message.setJMSCorrelationID(correlationId);
+ } catch (JMSException e) {
+ // ignore
}
- // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM)
- if (headerName.startsWith("JMS_")) {
+ }
+
+ /**
+ * Whether the destination name has either queue or temp queue prefix.
+ *
+ * @param destination the destination
+ * @return <tt>true</tt> if queue or temp-queue prefix, <tt>false</tt> otherwise
+ */
+ public static boolean isQueuePrefix(String destination) {
+ if (ObjectHelper.isEmpty(destination)) {
return false;
}
- // the 4th char must be a letter to be a standard JMS header
- if (headerName.length() > 3) {
- Character fourth = headerName.charAt(3);
- if (Character.isLetter(fourth)) {
- return true;
- }
+ return destination.startsWith(JmsConstants.QUEUE_PREFIX) || destination.startsWith(JmsConstants.TEMP_QUEUE_PREFIX);
+ }
+
+ /**
+ * Whether the destination name has either topic or temp topic prefix.
+ *
+ * @param destination the destination
+ * @return <tt>true</tt> if topic or temp-topic prefix, <tt>false</tt> otherwise
+ */
+ public static boolean isTopicPrefix(String destination) {
+ if (ObjectHelper.isEmpty(destination)) {
+ return false;
}
- return false;
+ return destination.startsWith(JmsConstants.TOPIC_PREFIX) || destination.startsWith(JmsConstants.TEMP_TOPIC_PREFIX);
}
/**
- * Strategy to test if the given header is valid according to the JMS spec to be set as a property
- * on the JMS message.
+ * Normalizes the destination name.
* <p/>
- * This default implementation will allow:
- * <ul>
- * <li>any primitives and their counter Objects (Integer, Double etc.)</li>
- * <li>String and any other literals, Character, CharSequence</li>
- * <li>Boolean</li>
- * <li>Number</li>
- * <li>java.util.Date</li>
- * </ul>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
*
- * @param headerName the header name
- * @param headerValue the header value
- * @return the value to use, <tt>null</tt> to ignore this header
+ * @param destination the destination
+ * @return the normalized destination
*/
- protected static Object getValidJMSHeaderValue(String headerName, Object headerValue) {
- if (headerValue instanceof String) {
- return headerValue;
- } else if (headerValue instanceof BigInteger) {
- return headerValue.toString();
- } else if (headerValue instanceof BigDecimal) {
- return headerValue.toString();
- } else if (headerValue instanceof Number) {
- return headerValue;
- } else if (headerValue instanceof Character) {
- return headerValue;
- } else if (headerValue instanceof CharSequence) {
- return headerValue.toString();
- } else if (headerValue instanceof Boolean) {
- return headerValue;
- } else if (headerValue instanceof Date) {
- return headerValue.toString();
- }
- return null;
+ public static String normalizeDestinationName(String destination) {
+ // do not include prefix which is the current behavior when using this method.
+ return normalizeDestinationName(destination, false);
}
- @SuppressWarnings("unchecked")
- public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) throws JMSException {
- Map<String, Object> headers = new HashMap<String, Object>();
- if (jmsMessage != null) {
- // lets populate the standard JMS message headers
- try {
- headers.put(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID());
- headers.put(JMS_DELIVERY_MODE, jmsMessage.getJMSDeliveryMode());
- headers.put(JMS_DESTINATION, jmsMessage.getJMSDestination());
- headers.put(JMS_EXPIRATION, jmsMessage.getJMSExpiration());
- headers.put(JMS_MESSAGE_ID, jmsMessage.getJMSMessageID());
- headers.put(JMS_PRIORITY, jmsMessage.getJMSPriority());
- headers.put(JMS_REDELIVERED, jmsMessage.getJMSRedelivered());
- headers.put(JMS_TIMESTAMP, jmsMessage.getJMSTimestamp());
- headers.put(JMS_REPLY_TO, getJMSReplyTo(jmsMessage));
- headers.put(JMS_TYPE, getJMSType(jmsMessage));
-
- // this works around a bug in the ActiveMQ property handling
- headers.put(JMSX_GROUP_ID, jmsMessage.getStringProperty(JMSX_GROUP_ID));
- } catch (JMSException e) {
- throw new RuntimeCamelException(e);
+ /**
+ * Normalizes the destination name.
+ * <p/>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
+ *
+ * @param destination the destination
+ * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+ * @return the normalized destination
+ */
+ public static String normalizeDestinationName(String destination, boolean includePrefix) {
+ if (ObjectHelper.isEmpty(destination)) {
+ return destination;
+ }
+ if (destination.startsWith(JmsConstants.QUEUE_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(JmsConstants.QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = JmsConstants.QUEUE_PREFIX + "//" + s;
}
-
- for (final Enumeration<String> enumeration = jmsMessage.getPropertyNames(); enumeration.hasMoreElements();) {
- String key = enumeration.nextElement();
- if (hasIllegalHeaderKey(key)) {
- throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value");
- }
- Object value = jmsMessage.getObjectProperty(key);
- String decodedName = keyFormatStrategy.decodeKey(key);
- headers.put(decodedName, value);
+ return s;
+ } else if (destination.startsWith(JmsConstants.TEMP_QUEUE_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(JmsConstants.TEMP_QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = JmsConstants.TEMP_QUEUE_PREFIX + "//" + s;
}
- }
- if (out) {
- exchange.getOut().setHeaders(headers);
+ return s;
+ } else if (destination.startsWith(JmsConstants.TOPIC_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(JmsConstants.TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = JmsConstants.TOPIC_PREFIX + "//" + s;
+ }
+ return s;
+ } else if (destination.startsWith(JmsConstants.TEMP_TOPIC_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(JmsConstants.TEMP_TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = JmsConstants.TEMP_TOPIC_PREFIX + "//" + s;
+ }
+ return s;
} else {
- exchange.getIn().setHeaders(headers);
+ return destination;
}
- return exchange;
}
/**
- * Strategy to allow filtering of headers which are put on the JMS message
- * <p/>
- * <b>Note</b>: Currently only supports sending java identifiers as keys
+ * Sets the JMSReplyTo on the message.
+ *
+ * @param message the message
+ * @param replyTo the reply to destination
*/
- protected static boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String headerName,
- Object headerValue, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
- return headerFilterStrategy == null
- || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange);
+ public static void setJMSReplyTo(Message message, Destination replyTo) {
+ try {
+ message.setJMSReplyTo(replyTo);
+ } catch (Exception e) {
+ // ignore due OracleAQ does not support accessing JMSReplyTo
+ }
+ }
+
+ /**
+ * Gets the JMSReplyTo from the message.
+ *
+ * @param message the message
+ * @return the reply to, can be <tt>null</tt>
+ */
+ public static Destination getJMSReplyTo(Message message) {
+ try {
+ return message.getJMSReplyTo();
+ } catch (Exception e) {
+ // ignore due OracleAQ does not support accessing JMSReplyTo
+ }
+
+ return null;
}
/**
* Gets the JMSType from the message.
*
- * @param message the message
+ * @param message the message
* @return the type, can be <tt>null</tt>
*/
public static String getJMSType(Message message) {
@@ -431,6 +295,54 @@ public final class JmsMessageHelper implements JmsConstants {
}
/**
+ * Gets the String Properties from the message.
+ *
+ * @param message the message
+ * @return the type, can be <tt>null</tt>
+ */
+ public static String getStringProperty(Message message, String propertyName) {
+ try {
+ return message.getStringProperty(propertyName);
+ } catch (Exception e) {
+ // ignore due some broker client does not support accessing StringProperty
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets the JMSRedelivered from the message.
+ *
+ * @param message the message
+ * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, <tt>null</tt> if not able to determine
+ */
+ public static Boolean getJMSRedelivered(Message message) {
+ try {
+ return message.getJMSRedelivered();
+ } catch (Exception e) {
+ // ignore if JMS broker do not support this
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets the JMSMessageID from the message.
+ *
+ * @param message the message
+ * @return the JMSMessageID, or <tt>null</tt> if not able to get
+ */
+ public static String getJMSMessageID(Message message) {
+ try {
+ return message.getJMSMessageID();
+ } catch (Exception e) {
+ // ignore if JMS broker do not support this
+ }
+
+ return null;
+ }
+
+ /**
* Sets the JMSDeliveryMode on the message.
*
* @param exchange the exchange
@@ -471,219 +383,19 @@ public final class JmsMessageHelper implements JmsConstants {
}
/**
- * Sets the correlation id on the JMS message.
- * <p/>
- * Will ignore exception thrown
- *
- * @param message the JMS message
- * @param type the correlation id
- */
- public static void setMessageType(Message message, String type) {
- try {
- message.setJMSType(type);
- } catch (JMSException e) {
- LOG.debug("Error setting the message type: {}", type, e);
- }
- }
-
- /**
- * Sets the correlation id on the JMS message.
- * <p/>
- * Will ignore exception thrown
- *
- * @param message the JMS message
- * @param correlationId the correlation id
- */
- public static void setCorrelationId(Message message, String correlationId) {
- try {
- message.setJMSCorrelationID(correlationId);
- } catch (JMSException e) {
- LOG.debug("Error setting the correlationId: {}", correlationId, e);
- }
- }
-
- /**
- * Sets the JMSReplyTo on the message.
+ * Gets the JMSCorrelationIDAsBytes from the message.
*
* @param message the message
- * @param replyTo the reply to destination
+ * @return the JMSCorrelationIDAsBytes, or <tt>null</tt> if not able to get
*/
- public static void setJMSReplyTo(Message message, Destination replyTo) {
+ public static String getJMSCorrelationIDAsBytes(Message message) {
try {
- message.setJMSReplyTo(replyTo);
+ return new String(message.getJMSCorrelationIDAsBytes());
} catch (Exception e) {
- LOG.debug("Error setting the correlationId: {}", replyTo.toString());
+ // ignore if JMS broker do not support this
}
- }
- /**
- * Gets the JMSReplyTo from the message.
- *
- * @param message the message
- * @return the reply to, can be <tt>null</tt>
- */
- public static Destination getJMSReplyTo(Message message) {
- try {
- return message.getJMSReplyTo();
- } catch (Exception e) {
- // ignore due OracleAQ does not support accessing JMSReplyTo
- }
return null;
}
- /**
- * Sets the property on the given JMS message.
- *
- * @param jmsMessage the JMS message
- * @param name name of the property to set
- * @param value the value
- * @throws JMSException can be thrown
- */
- public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
- if (value == null) {
- jmsMessage.setObjectProperty(name, null);
- } else if (value instanceof Byte) {
- jmsMessage.setByteProperty(name, (Byte) value);
- } else if (value instanceof Boolean) {
- jmsMessage.setBooleanProperty(name, (Boolean) value);
- } else if (value instanceof Double) {
- jmsMessage.setDoubleProperty(name, (Double) value);
- } else if (value instanceof Float) {
- jmsMessage.setFloatProperty(name, (Float) value);
- } else if (value instanceof Integer) {
- jmsMessage.setIntProperty(name, (Integer) value);
- } else if (value instanceof Long) {
- jmsMessage.setLongProperty(name, (Long) value);
- } else if (value instanceof Short) {
- jmsMessage.setShortProperty(name, (Short) value);
- } else if (value instanceof String) {
- jmsMessage.setStringProperty(name, (String) value);
- } else {
- // fallback to Object
- jmsMessage.setObjectProperty(name, value);
- }
- }
-
- public static JmsMessageType discoverMessageTypeFromPayload(final Object payload) {
- JmsMessageType answer;
- // Default is a JMS Message since a body is not required
- if (payload == null) {
- answer = JmsMessageType.Message;
- } else {
- // Something was found in the body so determine
- // what type of message we need to create
- if (byte[].class.isInstance(payload)) {
- answer = JmsMessageType.Bytes;
- } else if (Map.class.isInstance(payload)) {
- answer = JmsMessageType.Map;
- } else if (Collection.class.isInstance(payload)) {
- answer = JmsMessageType.Stream;
- } else if (InputStream.class.isInstance(payload)) {
- answer = JmsMessageType.Bytes;
- } else if (ByteBuffer.class.isInstance(payload)) {
- answer = JmsMessageType.Bytes;
- } else if (File.class.isInstance(payload)) {
- answer = JmsMessageType.Bytes;
- } else if (Reader.class.isInstance(payload)) {
- answer = JmsMessageType.Text;
- } else if (String.class.isInstance(payload)) {
- answer = JmsMessageType.Text;
- } else if (CharBuffer.class.isInstance(payload)) {
- answer = JmsMessageType.Text;
- } else if (char[].class.isInstance(payload)) {
- answer = JmsMessageType.Text;
- } else if (Character.class.isInstance(payload)) {
- answer = JmsMessageType.Text;
- } else if (Serializable.class.isInstance(payload)) {
- answer = JmsMessageType.Object;
- } else {
- answer = JmsMessageType.Message;
- }
- }
- return answer;
- }
-
- public static JmsMessageType discoverJmsMessageType(Message message) {
- JmsMessageType answer;
- if (message != null) {
- if (BytesMessage.class.isInstance(message)) {
- answer = JmsMessageType.Bytes;
- } else if (MapMessage.class.isInstance(message)) {
- answer = JmsMessageType.Map;
- } else if (TextMessage.class.isInstance(message)) {
- answer = JmsMessageType.Text;
- } else if (StreamMessage.class.isInstance(message)) {
- answer = JmsMessageType.Stream;
- } else if (ObjectMessage.class.isInstance(message)) {
- answer = JmsMessageType.Object;
- } else {
- answer = JmsMessageType.Message;
- }
- } else {
- answer = JmsMessageType.Message;
- }
- return answer;
- }
-
- private static boolean hasIllegalHeaderKey(String key) {
- return key == null || key.isEmpty() || key.contains(".") || key.contains("-");
- }
-
- /**
- * Normalizes the destination name.
- * <p/>
- * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
- * was intended as <tt>queue://foo</tt>.
- *
- * @param destination the destination
- * @return the normalized destination
- */
- public static String normalizeDestinationName(String destination) {
- // do not include prefix which is the current behavior when using this method.
- return normalizeDestinationName(destination, false);
- }
-
- /**
- * Normalizes the destination name.
- * <p/>
- * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
- * was intended as <tt>queue://foo</tt>.
- *
- * @param destination the destination
- * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
- * @return the normalized destination
- */
- public static String normalizeDestinationName(String destination, boolean includePrefix) {
- if (ObjectHelper.isEmpty(destination)) {
- return destination;
- }
- if (destination.startsWith(QUEUE_PREFIX)) {
- String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
- if (includePrefix) {
- s = QUEUE_PREFIX + "//" + s;
- }
- return s;
- } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
- String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
- if (includePrefix) {
- s = TEMP_QUEUE_PREFIX + "//" + s;
- }
- return s;
- } else if (destination.startsWith(TOPIC_PREFIX)) {
- String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
- if (includePrefix) {
- s = TOPIC_PREFIX + "//" + s;
- }
- return s;
- } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
- String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
- if (includePrefix) {
- s = TEMP_TOPIC_PREFIX + "//" + s;
- }
- return s;
- } else {
- return destination;
- }
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java
deleted file mode 100644
index 3b7566f..0000000
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sjms.jms;
-
-/**
- * Strategy for applying encoding and decoding of JMS headers so they apply to
- * the JMS spec.
- */
-public interface KeyFormatStrategy {
-
- /**
- * Encodes the key before its sent as a {@link javax.jms.Message} message.
- *
- * @param key the original key
- * @return the encoded key
- */
- String encodeKey(String key);
-
- /**
- * Decodes the key after its received from a {@link javax.jms.Message}
- * message.
- *
- * @param key the encoded key
- * @return the decoded key as the original key
- */
- String decodeKey(String key);
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java
new file mode 100644
index 0000000..894ef61
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.Exchange;
+
+/**
+ * A strategy that allows custom components to plugin and perform custom logic when Camel creates {@link javax.jms.Message} instance.
+ * <p/>
+ * For example to populate the message with custom information that are component specific and not part of the JMS specification.
+ */
+public interface MessageCreatedStrategy {
+
+ /**
+ * Callback when the JMS message has <i>just</i> been created, which allows custom modifications afterwards.
+ *
+ * @param exchange the current exchange
+ * @param session the JMS session used to create the message
+ * @param cause optional exception occurred that should be sent as reply instead of a regular body
+ */
+ void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index 3fa23b0..93f8648 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -32,7 +32,6 @@ import org.apache.camel.component.sjms.BatchMessage;
import org.apache.camel.component.sjms.MessageProducerResources;
import org.apache.camel.component.sjms.SjmsProducer;
import org.apache.camel.component.sjms.TransactionCommitStrategy;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
@@ -46,11 +45,6 @@ public class InOnlyProducer extends SjmsProducer {
super(endpoint);
}
- /*
- * @see org.apache.camel.component.sjms.SjmsProducer#doCreateProducerModel()
- * @return
- * @throws Exception
- */
@Override
public MessageProducerResources doCreateProducerModel() throws Exception {
MessageProducerResources answer;
@@ -75,13 +69,6 @@ public class InOnlyProducer extends SjmsProducer {
return answer;
}
- /*
- * @see
- * org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
- * @param exchange
- * @param callback
- * @throws Exception
- */
@Override
public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception {
try {
@@ -93,19 +80,18 @@ public class InOnlyProducer extends SjmsProducer {
Message message;
if (BatchMessage.class.isInstance(object)) {
BatchMessage<?> batchMessage = (BatchMessage<?>) object;
- message = JmsMessageHelper.createMessage(exchange, producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint());
+ message = getEndpoint().getBinding().makeJmsMessage(exchange, batchMessage.getPayload(), batchMessage.getHeaders(), producer.getSession(), null);
} else {
- message = JmsMessageHelper.createMessage(exchange, producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint());
+ message = getEndpoint().getBinding().makeJmsMessage(exchange, object, exchange.getIn().getHeaders(), producer.getSession(), null);
}
messages.add(message);
}
} else {
- Object payload = exchange.getIn().getBody();
- Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint());
+ Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
messages.add(message);
}
} else {
- Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), null, exchange.getIn().getHeaders(), getEndpoint());
+ Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
messages.add(message);
}
@@ -124,4 +110,5 @@ public class InOnlyProducer extends SjmsProducer {
callback.done(isSynchronous());
}
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 039daf0..1c535b6 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -36,11 +36,14 @@ import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.MessageConsumerResources;
import org.apache.camel.component.sjms.MessageProducerResources;
import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsMessage;
import org.apache.camel.component.sjms.SjmsProducer;
import org.apache.camel.component.sjms.jms.JmsConstants;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
+import org.apache.camel.spi.UuidGenerator;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
@@ -52,6 +55,17 @@ public class InOutProducer extends SjmsProducer {
private static final Map<String, Exchanger<Object>> EXCHANGERS = new ConcurrentHashMap<String, Exchanger<Object>>();
+ private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
+ private UuidGenerator uuidGenerator;
+
+ public UuidGenerator getUuidGenerator() {
+ return uuidGenerator;
+ }
+
+ public void setUuidGenerator(UuidGenerator uuidGenerator) {
+ this.uuidGenerator = uuidGenerator;
+ }
+
/**
* A pool of {@link MessageConsumerResources} objects that are the reply
* consumers.
@@ -134,6 +148,10 @@ public class InOutProducer extends SjmsProducer {
} else {
log.debug("Using {} as the reply to destination.", getNamedReplyTo());
}
+ if (uuidGenerator == null) {
+ // use the generator configured on the camel context
+ uuidGenerator = getEndpoint().getCamelContext().getUuidGenerator();
+ }
if (getConsumers() == null) {
setConsumers(new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory()));
getConsumers().setMaxActive(getConsumerCount());
@@ -185,16 +203,14 @@ public class InOutProducer extends SjmsProducer {
exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
}
- Message request = JmsMessageHelper.createMessage(exchange, producer.getSession(), getEndpoint());
+ Message request = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
- // TODO just set the correlation id don't get it from the
- // message
- String correlationId;
- if (exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class) == null) {
- correlationId = UUID.randomUUID().toString().replace("-", "");
- } else {
- correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
+ String correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
+ if (correlationId == null) {
+ // we append the 'Camel-' prefix to know it was generated by us
+ correlationId = GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid();
}
+
Object responseObject = null;
Exchanger<Object> messageExchanger = new Exchanger<Object>();
JmsMessageHelper.setCorrelationId(request, correlationId);
@@ -229,8 +245,13 @@ public class InOutProducer extends SjmsProducer {
if (responseObject instanceof Throwable) {
exchange.setException((Throwable) responseObject);
} else if (responseObject instanceof Message) {
- Message response = (Message) responseObject;
- JmsMessageHelper.populateExchange(response, exchange, true, getEndpoint().getJmsKeyFormatStrategy());
+ Message message = (Message) responseObject;
+
+ SjmsMessage response = new SjmsMessage(message, consumer.getSession(), getEndpoint().getBinding());
+ // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand
+ // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access
+ // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218).
+ exchange.setOut(response);
} else {
exchange.setException(new CamelException("Unknown response type: " + responseObject));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
deleted file mode 100644
index d389886..0000000
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sjms.typeconversion;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.StringReader;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.sjms.support.JmsTestSupport;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class JMSMessageHelperTypeConversionTest extends JmsTestSupport {
-
- private static final String SJMS_QUEUE_URI = "sjms:queue:start";
- private static final String MOCK_RESULT_URI = "mock:result";
- private Exchange message;
-
- @Test
- public void testJMSMessageHelperString() throws Exception {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
-
- template.sendBody(SJMS_QUEUE_URI, "Hello Camel");
- assertMockEndpointsSatisfied();
- assertTrue(String.class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperMap() throws Exception {
- getMockEndpoint(MOCK_RESULT_URI).expectedMessageCount(1);
-
- Map<Object, Object> map = new HashMap<Object, Object>();
- map.put("Hello", "Camel");
- map.put("Int", Integer.MAX_VALUE);
- map.put("Boolean", Boolean.TRUE);
- map.put(Boolean.TRUE, Long.MAX_VALUE);
-
- template.sendBody(SJMS_QUEUE_URI, map);
- assertMockEndpointsSatisfied();
- assertTrue(Map.class.isInstance(message.getIn().getBody()));
- assertEquals("Camel", message.getIn().getBody(Map.class).get("Hello"));
- assertEquals(Integer.MAX_VALUE, message.getIn().getBody(Map.class).get("Int"));
- assertEquals(Boolean.TRUE, message.getIn().getBody(Map.class).get("Boolean"));
- assertEquals(Long.MAX_VALUE, message.getIn().getBody(Map.class).get("true"));
- }
-
- @Ignore
- @Test
- public void testJMSMessageHelperCollection() throws Exception {
- // TODO: Once SJMS can accept a Collection as Body
- }
-
- @Test
- public void testJMSMessageHelperByteArray() throws Exception {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
-
- byte[] bytes = "Hello Camel".getBytes();
- template.sendBody(SJMS_QUEUE_URI, bytes);
- assertMockEndpointsSatisfied();
- assertTrue(byte[].class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperInputStream() throws Exception {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
- String p = "Hello Camel";
- InputStream is = new ByteArrayInputStream(p.getBytes());
- template.sendBody(SJMS_QUEUE_URI, is);
- assertMockEndpointsSatisfied();
- assertTrue(byte[].class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperCharBuffer() throws Exception {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
- CharBuffer cb = CharBuffer.wrap("Hello Camel");
- template.sendBody(SJMS_QUEUE_URI, cb);
- assertMockEndpointsSatisfied();
- assertTrue(String.class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperByteBuffer() throws Exception {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
- String p = "Hello Camel";
- ByteBuffer bb = ByteBuffer.wrap(p.getBytes());
- template.sendBody(SJMS_QUEUE_URI, bb);
- assertMockEndpointsSatisfied();
- assertTrue(byte[].class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperFile() throws InterruptedException, IOException {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
- String p = "Hello Camel";
- File f = File.createTempFile("tmp-test", ".txt");
- BufferedWriter bw = new BufferedWriter(new FileWriter(f));
- bw.write(p);
- bw.close();
- template.sendBody(SJMS_QUEUE_URI, f);
- assertMockEndpointsSatisfied();
- boolean resultDelete = f.delete();
- assertTrue(resultDelete);
- assertTrue(byte[].class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperReader() throws InterruptedException, IOException {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
- String p = "Hello Camel";
- File f = File.createTempFile("tmp-test", ".txt");
- BufferedWriter bw = new BufferedWriter(new FileWriter(f));
- bw.write(p);
- bw.close();
- Reader test = new BufferedReader(new FileReader(f.getAbsolutePath()));
- template.sendBody(SJMS_QUEUE_URI, test);
- assertMockEndpointsSatisfied();
- assertTrue(f.delete());
- assertTrue(String.class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperStringReader() throws InterruptedException, FileNotFoundException {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
- String p = "Hello Camel";
- StringReader test = new StringReader(p);
- template.sendBody(SJMS_QUEUE_URI, test);
- assertMockEndpointsSatisfied();
- assertTrue(String.class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperChar() throws InterruptedException, FileNotFoundException {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("H");
- char p = 'H';
- template.sendBody(SJMS_QUEUE_URI, p);
- assertMockEndpointsSatisfied();
- assertTrue(String.class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperCharacter() throws InterruptedException, FileNotFoundException {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("H");
- Character p = 'H';
- template.sendBody(SJMS_QUEUE_URI, p);
- assertMockEndpointsSatisfied();
- assertTrue(String.class.isInstance(message.getIn().getBody()));
- }
-
- @Test
- public void testJMSMessageHelperCharArray() throws InterruptedException, FileNotFoundException {
- getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
- char[] p = {'H', 'e', 'l', 'l', 'o', ' ', 'C', 'a', 'm', 'e', 'l'};
- template.sendBody(SJMS_QUEUE_URI, p);
- assertMockEndpointsSatisfied();
- assertTrue(String.class.isInstance(message.getIn().getBody()));
- }
-
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() throws Exception {
- interceptSendToEndpoint(MOCK_RESULT_URI).process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- message = exchange;
- }
- });
-
- from(SJMS_QUEUE_URI).to(MOCK_RESULT_URI);
- }
- };
- }
-}