You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/30 05:42:14 UTC
svn commit: r1378796 [2/3] - in /camel/trunk/components/camel-sjms: ./
src/main/java/org/apache/camel/component/sjms/
src/main/java/org/apache/camel/component/sjms/consumer/
src/main/java/org/apache/camel/component/sjms/jms/
src/main/java/org/apache/ca...
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java Thu Aug 30 03:42:12 2012
@@ -18,11 +18,8 @@ package org.apache.camel.component.sjms.
import java.io.Serializable;
import java.util.Collection;
-import java.util.Enumeration;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import javax.jms.BytesMessage;
@@ -35,373 +32,202 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.sjms.DefaultJmsKeyFormatStrategy;
import org.apache.camel.component.sjms.IllegalHeaderException;
-import org.apache.camel.impl.DefaultMessage;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.ObjectHelper;
-
+import org.apache.camel.component.sjms.KeyFormatStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE;
-import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX;
-import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX;
-import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
-
/**
* Utility class for {@link javax.jms.Message}.
- *
- * @version
+ *
+ * @author sully6768
*/
public final class JmsMessageHelper {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
-
- private JmsMessageHelper() {
- }
-
- public static Exchange createExchange(Message message, Endpoint endpoint) {
- Exchange exchange = endpoint.createExchange();
- return populateExchange(message, exchange, false);
- }
-
- @SuppressWarnings("unchecked")
- public static Exchange populateExchange(Message message, Exchange exchange, boolean out) {
- try {
- JmsMessageHelper.setJmsMessageHeaders(message, exchange, out);
- if (message != null) {
- // convert to JMS Message of the given type
-
- DefaultMessage bodyMessage = null;
- if (out) {
- bodyMessage = (DefaultMessage) exchange.getOut();
- } else {
- bodyMessage = (DefaultMessage) exchange.getIn();
- }
- switch (JmsMessageHelper.discoverType(message)) {
- case Bytes:
- BytesMessage bytesMessage = (BytesMessage) message;
- if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
- LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
- return null;
- }
- byte[] result = new byte[(int) bytesMessage.getBodyLength()];
- bytesMessage.readBytes(result);
- bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Bytes);
- bodyMessage.setBody(result);
- break;
- case Map:
- HashMap<String, Object> body = new HashMap<String, Object>();
- MapMessage mapMessage = (MapMessage) message;
- Enumeration<String> names = mapMessage.getMapNames();
- while (names.hasMoreElements()) {
- String key = names.nextElement();
- Object value = mapMessage.getObject(key);
- body.put(key, value);
- }
- bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Map);
- bodyMessage.setBody(body);
- break;
- case Object:
- ObjectMessage objMsg = (ObjectMessage) message;
- bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Object);
- bodyMessage.setBody(objMsg.getObject());
- break;
- case Text:
- TextMessage textMsg = (TextMessage) message;
- bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Text);
- bodyMessage.setBody(textMsg.getText());
- break;
- case Message:
- default:
- // Do nothing. Only set the headers for an empty message
- bodyMessage.setBody(message);
- break;
- }
- }
- } catch (Exception e) {
- exchange.setException(e);
- }
- return exchange;
- }
/**
- * Removes the property from the JMS message.
- *
- * @param jmsMessage the JMS message
- * @param name name of the property to remove
- * @return the old value of the property or <tt>null</tt> if not exists
- * @throws JMSException can be thrown
+ * Set by the publishing Client
*/
- public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException {
- // check if the property exists
- if (!jmsMessage.propertyExists(name)) {
- return null;
- }
-
- Object answer = null;
-
- // store the properties we want to keep in a temporary map
- // as the JMS API is a bit strict as we are not allowed to
- // clear a single property, but must clear them all and redo
- // the properties
- Map<String, Object> map = new LinkedHashMap<String, Object>();
- Enumeration<?> en = jmsMessage.getPropertyNames();
- while (en.hasMoreElements()) {
- String key = (String) en.nextElement();
- if (name.equals(key)) {
- answer = key;
- } else {
- map.put(key, jmsMessage.getObjectProperty(key));
- }
- }
-
- // redo the properties to keep
- jmsMessage.clearProperties();
- for (Entry<String, Object> entry : map.entrySet()) {
- jmsMessage.setObjectProperty(entry.getKey(), entry.getValue());
- }
-
- return answer;
- }
-
+ public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
/**
- * Tests whether a given property with the name exists
- *
- * @param jmsMessage the JMS message
- * @param name name of the property to test if exists
- * @return <tt>true</tt> if the property exists, <tt>false</tt> if not.
- * @throws JMSException can be thrown
+ * Set on the send or publish event
*/
- public static boolean hasProperty(Message jmsMessage, String name) throws JMSException {
- Enumeration<?> en = jmsMessage.getPropertyNames();
- while (en.hasMoreElements()) {
- String key = (String) en.nextElement();
- if (name.equals(key)) {
- return true;
- }
- }
- return false;
- }
-
+ public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
/**
- * Sets the property on the given JMS message.
- *
- * @param jmsMessage the JMS message
- * @param name name of the property to set
- * @param value the value
- * @throws JMSException can be thrown
+ * Set on the send or publish event
*/
- public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
- if (value == null) {
- return;
- }
- if (value instanceof Byte) {
- jmsMessage.setByteProperty(name, (Byte) value);
- } else if (value instanceof Boolean) {
- jmsMessage.setBooleanProperty(name, (Boolean) value);
- } else if (value instanceof Double) {
- jmsMessage.setDoubleProperty(name, (Double) value);
- } else if (value instanceof Float) {
- jmsMessage.setFloatProperty(name, (Float) value);
- } else if (value instanceof Integer) {
- jmsMessage.setIntProperty(name, (Integer) value);
- } else if (value instanceof Long) {
- jmsMessage.setLongProperty(name, (Long) value);
- } else if (value instanceof Short) {
- jmsMessage.setShortProperty(name, (Short) value);
- } else if (value instanceof String) {
- jmsMessage.setStringProperty(name, (String) value);
- } else {
- // fallback to Object
- jmsMessage.setObjectProperty(name, value);
- }
- }
-
+ public static final String JMS_DESTINATION = "JMSDestination";
/**
- * Sets the correlation id on the JMS message.
- * <p/>
- * Will ignore exception thrown
- *
- * @param message the JMS message
- * @param correlationId the correlation id
+ * Set on the send or publish event
*/
- public static void setCorrelationId(Message message, String correlationId) {
- try {
- message.setJMSCorrelationID(correlationId);
- } catch (JMSException e) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Error setting the correlationId: {}", correlationId);
- }
- }
- }
-
+ public static final String JMS_EXPIRATION = "JMSExpiration";
/**
- * Normalizes the destination name, by removing any leading queue or topic prefixes.
- *
- * @param destination the destination
- * @return the normalized destination
- */
- public static String normalizeDestinationName(String destination) {
- if (ObjectHelper.isEmpty(destination)) {
- return destination;
- }
- if (destination.startsWith(QUEUE_PREFIX)) {
- return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
- } else if (destination.startsWith(TOPIC_PREFIX)) {
- return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
- } else {
- return destination;
- }
- }
-
+ * Set on the send or publish event
+ */
+ public static final String JMS_MESSAGE_ID = "JMSMessageID";
/**
- * Sets the JMSReplyTo on the message.
- *
- * @param message the message
- * @param replyTo the reply to destination
+ * Set on the send or publish event
*/
- public static void setJMSReplyTo(Message message, Destination replyTo) {
- try {
- message.setJMSReplyTo(replyTo);
- } catch (Exception e) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
- }
- }
- }
-
+ public static final String JMS_PRIORITY = "JMSPriority";
/**
- * Gets the JMSReplyTo from the message.
- *
- * @param message the message
- * @return the reply to, can be <tt>null</tt>
+ * A redelivery flag set by the JMS provider
*/
- public static Destination getJMSReplyTo(Message message) {
- try {
- return message.getJMSReplyTo();
- } catch (Exception e) {
- // ignore due OracleAQ does not support accessing JMSReplyTo
- }
-
- return null;
- }
-
+ public static final String JMS_REDELIVERED = "JMSTimestamp";
/**
- * Gets the JMSType from the message.
- *
- * @param message the message
- * @return the type, can be <tt>null</tt>
+ * The JMS Reply To {@link Destination} set by the publishing Client
*/
- public static String getJMSType(Message message) {
- try {
- return message.getJMSType();
- } catch (Exception e) {
- // ignore due OracleAQ does not support accessing JMSType
- }
+ public static final String JMS_REPLY_TO = "JMSReplyTo";
+ /**
+ * Set on the send or publish event
+ */
+ public static final String JMS_TIMESTAMP = "JMSTimestamp";
+ /**
+ * Set by the publishing Client
+ */
+ public static final String JMS_TYPE = "JMSType";
- return null;
+ private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+
+ private JmsMessageHelper() {
}
- /**
- * Gets the JMSRedelivered from the message.
- *
- * @param message the message
- * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, <tt>null</tt> if not able to determine
- */
- public static Boolean getJMSRedelivered(Message message) {
+ @SuppressWarnings("unchecked")
+ public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws Exception {
+ Message answer = null;
+ JmsMessageType messageType = JmsMessageHelper.discoverPayloadType(payload);
try {
- return message.getJMSRedelivered();
+
+ switch (messageType) {
+ case Bytes:
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes((byte[])payload);
+ answer = bytesMessage;
+ break;
+ case Map:
+ MapMessage mapMessage = session.createMapMessage();
+ Map<String, Object> objMap = (Map<String, Object>)payload;
+ Set<String> keys = objMap.keySet();
+ for (String key : keys) {
+ Object value = objMap.get(key);
+ mapMessage.setObject(key, value);
+ }
+ answer = mapMessage;
+ break;
+ case Object:
+ ObjectMessage objectMessage = session.createObjectMessage();
+ objectMessage.setObject((Serializable)payload);
+ answer = objectMessage;
+ break;
+ case Text:
+ TextMessage textMessage = session.createTextMessage();
+ textMessage.setText((String)payload);
+ answer = textMessage;
+ break;
+ default:
+ break;
+ }
} catch (Exception e) {
- // ignore if JMS broker do not support this
+ LOGGER.error("Error creating a message of type: " + messageType.toString());
+ throw e;
}
-
- return null;
+ if (messageHeaders != null && !messageHeaders.isEmpty()) {
+ answer = JmsMessageHelper.setJmsMessageHeaders(answer, messageHeaders, keyFormatStrategy);
+ }
+ return answer;
}
/**
- * Sets the JMSDeliveryMode on the message.
- *
- * @param exchange the exchange
- * @param message the message
- * @param deliveryMode the delivery mode, either as a String or integer
- * @throws javax.jms.JMSException is thrown if error setting the delivery mode
+ * Adds or updates the {@link Message} headers. Header names and values are
+ * checked for JMS 1.1 compliance.
+ *
+ * @param jmsMessage the {@link Message} to add or update the headers on
+ * @param messageHeaders a {@link Map} of String/Object pairs
+ * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
+ * format keys in a JMS 1.1 compliant manner. If null the
+ * {@link DefaultJmsKeyFormatStrategy} will be used.
+ * @return {@link Message}
+ * @throws Exception a
*/
- public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode) throws JMSException {
- Integer mode = null;
-
- if (deliveryMode instanceof String) {
- String s = (String) deliveryMode;
- if ("PERSISTENT".equalsIgnoreCase(s)) {
- mode = DeliveryMode.PERSISTENT;
- } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
- mode = DeliveryMode.NON_PERSISTENT;
- } else {
- // it may be a number in the String so try that
- Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
- if (value != null) {
- mode = value;
- } else {
- throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode);
- }
- }
+ public static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException {
+ // Support for the null keyFormatStrategy
+ KeyFormatStrategy localKeyFormatStrategy = null;
+ if (keyFormatStrategy == null) {
+ localKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
} else {
- // fallback and try to convert to a number
- Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
- if (value != null) {
- mode = value;
- }
- }
-
- if (mode != null) {
- message.setJMSDeliveryMode(mode);
- message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode);
+ localKeyFormatStrategy = keyFormatStrategy;
}
- }
-
- public static Message setJmsMessageHeaders(final Exchange exchange, final Message jmsMessage) throws Exception {
- Map<String, Object> headers = new HashMap<String, Object>(exchange.getIn().getHeaders());
+ Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
Set<String> keys = headers.keySet();
for (String headerName : keys) {
Object headerValue = headers.get(headerName);
- if (headerName.equalsIgnoreCase("JMSCorrelationID")) {
- jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
- } else if (headerName.equalsIgnoreCase("JMSReplyTo") && headerValue != null) {
+
+ if (headerName.equalsIgnoreCase(JMS_CORRELATION_ID)) {
+ if (headerValue == null) {
+ // Value can be null but we can't cast a null to a String
+ // so pass null to the setter
+ setCorrelationId(jmsMessage, null);
+ } else if (headerValue instanceof String) {
+ setCorrelationId(jmsMessage, (String)headerValue);
+ } else {
+ throw new IllegalHeaderException("The " + JMS_CORRELATION_ID + " must either be a String or null. Found: " + headerValue.getClass().getName());
+ }
+ } else if (headerName.equalsIgnoreCase(JMS_REPLY_TO)) {
if (headerValue instanceof String) {
- // if the value is a String we must normalize it first
- headerValue = (String) headerValue;
+ // FIXME Setting the reply to appears broken. walk back
+ // through it. If the value is a String we must normalize it
+ // first
} else {
// TODO write destination converter
// Destination replyTo =
- // ExchangeHelper.convertToType(exchange, Destination.class,
+ // ExchangeHelper.convertToType(exchange,
+ // Destination.class,
// headerValue);
// jmsMessage.setJMSReplyTo(replyTo);
}
- } else if (headerName.equalsIgnoreCase("JMSType")) {
- jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
- } else if (headerName.equalsIgnoreCase("JMSPriority")) {
- jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
- } else if (headerName.equalsIgnoreCase("JMSDeliveryMode")) {
- JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
- } else if (headerName.equalsIgnoreCase("JMSExpiration")) {
- jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
+ } else if (headerName.equalsIgnoreCase(JMS_TYPE)) {
+ if (headerValue == null) {
+ // Value can be null but we can't cast a null to a String
+ // so pass null to the setter
+ setMessageType(jmsMessage, null);
+ } else if (headerValue instanceof String) {
+ // Not null but is a String
+ setMessageType(jmsMessage, (String)headerValue);
+ } else {
+ throw new IllegalHeaderException("The " + JMS_TYPE + " must either be a String or null. Found: " + headerValue.getClass().getName());
+ }
+ } else if (headerName.equalsIgnoreCase(JMS_PRIORITY)) {
+ if (headerValue instanceof Integer) {
+ try {
+ jmsMessage.setJMSPriority((Integer)headerValue);
+ } catch (JMSException e) {
+ throw new IllegalHeaderException("Failed to set the " + JMS_PRIORITY + " header. Cause: " + e.getLocalizedMessage(), e);
+ }
+ } else {
+ throw new IllegalHeaderException("The " + JMS_PRIORITY + " must be a Integer. Type found: " + headerValue.getClass().getName());
+ }
+ } else if (headerName.equalsIgnoreCase(JMS_DELIVERY_MODE)) {
+ try {
+ JmsMessageHelper.setJMSDeliveryMode(jmsMessage, headerValue);
+ } catch (JMSException e) {
+ throw new IllegalHeaderException("Failed to set the " + JMS_DELIVERY_MODE + " header. Cause: " + e.getLocalizedMessage(), e);
+ }
+ } else if (headerName.equalsIgnoreCase(JMS_EXPIRATION)) {
+ if (headerValue instanceof Long) {
+ try {
+ jmsMessage.setJMSExpiration((Long)headerValue);
+ } catch (JMSException e) {
+ throw new IllegalHeaderException("Failed to set the " + JMS_EXPIRATION + " header. Cause: " + e.getLocalizedMessage(), e);
+ }
+ } else {
+ throw new IllegalHeaderException("The " + JMS_EXPIRATION + " must be a Long. Type found: " + headerValue.getClass().getName());
+ }
} else {
- // The following properties are set by the MessageProducer:
- // JMSDestination
- // The following are set on the underlying JMS provider:
- // JMSMessageID, JMSTimestamp, JMSRedelivered
- // log at trace level to not spam log
LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
- if (headerName.equalsIgnoreCase("JMSDestination")
- || headerName.equalsIgnoreCase("JMSMessageID")
- || headerName.equalsIgnoreCase("JMSTimestamp")
+ if (headerName.equalsIgnoreCase(JMS_DESTINATION) || headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || headerName.equalsIgnoreCase("JMSTimestamp")
|| headerName.equalsIgnoreCase("JMSRedelivered")) {
- // The following properties are set by the MessageProducer:
+ // The following properties are set by the
+ // MessageProducer:
// JMSDestination
// The following are set on the underlying JMS provider:
// JMSMessageID, JMSTimestamp, JMSRedelivered
@@ -409,188 +235,151 @@ public final class JmsMessageHelper {
LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
} else {
if (!(headerValue instanceof JmsMessageType)) {
- String encodedName = new DefaultJmsKeyFormatStrategy().encodeKey(headerName);
- JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+ String encodedName = localKeyFormatStrategy.encodeKey(headerName);
+ try {
+ JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+ } catch (JMSException e) {
+ throw new IllegalHeaderException("Failed to set the header " + encodedName + " header. Cause: " + e.getLocalizedMessage(), e);
+ }
}
}
- }
+ // }
+ }
}
return jmsMessage;
}
-
- public static JmsMessageType discoverType(Message value) throws Exception {
- JmsMessageType answer = null;
- if (value != null) {
- if (Message.class.isInstance(value)) {
- if (BytesMessage.class.isInstance(value)) {
- answer = JmsMessageType.Bytes;
- } else if (MapMessage.class.isInstance(value)) {
- answer = JmsMessageType.Map;
- } else if (TextMessage.class.isInstance(value)) {
- answer = JmsMessageType.Text;
- } else if (ObjectMessage.class.isInstance(value)) {
- answer = JmsMessageType.Object;
+
+ /**
+ * Sets the JMSDeliveryMode on the message.
+ *
+ * @param exchange the exchange
+ * @param message the message
+ * @param deliveryMode the delivery mode, either as a String or integer
+ * @throws javax.jms.JMSException is thrown if error setting the delivery
+ * mode
+ */
+ public static void setJMSDeliveryMode(Message message, Object deliveryMode) throws JMSException {
+ Integer mode = null;
+
+ if (deliveryMode instanceof String) {
+ String s = (String)deliveryMode;
+ if ("PERSISTENT".equalsIgnoreCase(s)) {
+ mode = DeliveryMode.PERSISTENT;
+ } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
+ mode = DeliveryMode.NON_PERSISTENT;
+ } else {
+ // it may be a number in the String so try that
+ Integer value = null;
+ try {
+ value = Integer.valueOf(s);
+ } catch (NumberFormatException e) {
+ // Do nothing. The error handler below is sufficient
+ }
+ if (value != null) {
+ mode = value;
} else {
- answer = JmsMessageType.Message;
+ throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode);
}
}
+ } else if (deliveryMode instanceof Integer) {
+ // fallback and try to convert to a number
+ mode = (Integer)deliveryMode;
+ } else {
+ throw new IllegalArgumentException("Unable to convert the given delivery mode of type " + deliveryMode.getClass().getName() + " with value: " + deliveryMode);
+ }
+
+ if (mode != null) {
+ message.setJMSDeliveryMode(mode);
}
- return answer;
}
-
- public static JmsMessageType discoverType(final Exchange exchange) {
- JmsMessageType answer = (JmsMessageType) exchange.getIn().getHeader(JMS_MESSAGE_TYPE);
- if (answer == null) {
- final Object value = exchange.getIn().getBody();
- if (value != null) {
- if (Byte[].class.isInstance(value)) {
- answer = JmsMessageType.Bytes;
- } else if (Collection.class.isInstance(value)) {
- answer = JmsMessageType.Map;
- } else if (String.class.isInstance(value)) {
- answer = JmsMessageType.Text;
- } else if (Serializable.class.isInstance(value)) {
- answer = JmsMessageType.Object;
- } else {
- answer = JmsMessageType.Message;
- }
+
+ /**
+ * Sets the correlation id on the JMS message.
+ * <p/>
+ * Will ignore exception thrown
+ *
+ * @param message the JMS message
+ * @param type the correlation id
+ */
+ public static void setMessageType(Message message, String type) {
+ try {
+ message.setJMSType(type);
+ } catch (JMSException e) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Error setting the message type: {}", type);
}
}
- return answer;
}
- @SuppressWarnings("unchecked")
- public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out)
- throws JMSException {
- HashMap<String, Object> headers = new HashMap<String, Object>();
- if (jmsMessage != null) {
- // lets populate the standard JMS message headers
- try {
- headers.put(
- JmsMessageHeaderType.JMSCorrelationID.toString(),
- jmsMessage.getJMSCorrelationID());
- headers.put(
- JmsMessageHeaderType.JMSDeliveryMode.toString(),
- jmsMessage.getJMSDeliveryMode());
- headers.put(
- JmsMessageHeaderType.JMSDestination.toString(),
- jmsMessage.getJMSDestination());
- headers.put(
- JmsMessageHeaderType.JMSExpiration.toString(),
- jmsMessage.getJMSExpiration());
- headers.put(
- JmsMessageHeaderType.JMSMessageID.toString(),
- jmsMessage.getJMSMessageID());
- headers.put(
- JmsMessageHeaderType.JMSPriority.toString(),
- jmsMessage.getJMSPriority());
- headers.put(
- JmsMessageHeaderType.JMSRedelivered.toString(),
- jmsMessage.getJMSRedelivered());
- headers.put(
- JmsMessageHeaderType.JMSTimestamp.toString(),
- jmsMessage.getJMSTimestamp());
- headers.put(
- JmsMessageHeaderType.JMSReplyTo.toString(),
- JmsMessageHelper.getJMSReplyTo(jmsMessage));
- headers.put(
- JmsMessageHeaderType.JMSType.toString(),
- JmsMessageHelper.getJMSType(jmsMessage));
-
- // this works around a bug in the ActiveMQ property handling
- headers.put(
- JmsMessageHeaderType.JMSXGroupID.toString(),
- jmsMessage.getStringProperty(JmsMessageHeaderType.JMSXGroupID.toString()));
- } catch (JMSException e) {
- throw new RuntimeCamelException(e);
- }
-
- for (Enumeration<String> enumeration = jmsMessage
- .getPropertyNames(); enumeration.hasMoreElements();) {
- String key = enumeration.nextElement();
- if (hasIllegalHeaderKey(key)) {
- throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value");
- }
- Object value = jmsMessage.getObjectProperty(key);
- headers.put(key, value);
+ /**
+ * Sets the correlation id on the JMS message.
+ * <p/>
+ * Will ignore exception thrown
+ *
+ * @param message the JMS message
+ * @param correlationId the correlation id
+ */
+ public static void setCorrelationId(Message message, String correlationId) {
+ try {
+ message.setJMSCorrelationID(correlationId);
+ } catch (JMSException e) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Error setting the correlationId: {}", correlationId);
}
}
- if (out) {
- exchange.getOut().setHeaders(headers);
+ }
+
+ /**
+ * Sets the property on the given JMS message.
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to set
+ * @param value the value
+ * @throws JMSException can be thrown
+ */
+ public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
+ if (value == null) {
+ return;
+ }
+ if (value instanceof Byte) {
+ jmsMessage.setByteProperty(name, (Byte)value);
+ } else if (value instanceof Boolean) {
+ jmsMessage.setBooleanProperty(name, (Boolean)value);
+ } else if (value instanceof Double) {
+ jmsMessage.setDoubleProperty(name, (Double)value);
+ } else if (value instanceof Float) {
+ jmsMessage.setFloatProperty(name, (Float)value);
+ } else if (value instanceof Integer) {
+ jmsMessage.setIntProperty(name, (Integer)value);
+ } else if (value instanceof Long) {
+ jmsMessage.setLongProperty(name, (Long)value);
+ } else if (value instanceof Short) {
+ jmsMessage.setShortProperty(name, (Short)value);
+ } else if (value instanceof String) {
+ jmsMessage.setStringProperty(name, (String)value);
} else {
- exchange.getIn().setHeaders(headers);
+ // fallback to Object
+ jmsMessage.setObjectProperty(name, value);
}
- return exchange;
- }
-
- public static Message createMessage(Exchange exchange, Session session) throws Exception {
- return createMessage(exchange, session, false);
}
-
- @SuppressWarnings("unchecked")
- public static Message createMessage(Exchange exchange, Session session, boolean out) throws Exception {
- Message answer = null;
- Object body = null;
- try {
- if (out && exchange.getOut().getBody() != null) {
- body = exchange.getOut().getBody();
- } else {
- body = exchange.getIn().getBody();
- }
- JmsMessageType messageType = JmsMessageHelper.discoverType(exchange);
- switch (messageType) {
- case Bytes:
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes((byte[])body);
- answer = bytesMessage;
- break;
- case Map:
- MapMessage mapMessage = session.createMapMessage();
- Map<String, Object> objMap = (Map<String, Object>)body;
- Set<String> keys = objMap.keySet();
- for (String key : keys) {
- Object value = objMap.get(key);
- mapMessage.setObject(key, value);
- }
- answer = mapMessage;
- break;
- case Object:
- ObjectMessage objectMessage = session.createObjectMessage();
- objectMessage.setObject((Serializable)body);
- answer = objectMessage;
- break;
- case Text:
- TextMessage textMessage = session.createTextMessage();
- textMessage.setText((String)body);
- answer = textMessage;
- break;
- default:
- break;
+ public static JmsMessageType discoverPayloadType(Object payload) {
+ JmsMessageType answer = null;
+ if (payload != null) {
+ if (Byte[].class.isInstance(payload)) {
+ answer = JmsMessageType.Bytes;
+ } else if (Collection.class.isInstance(payload)) {
+ answer = JmsMessageType.Map;
+ } else if (String.class.isInstance(payload)) {
+ answer = JmsMessageType.Text;
+ } else if (Serializable.class.isInstance(payload)) {
+ answer = JmsMessageType.Object;
+ } else {
+ answer = JmsMessageType.Message;
}
- } catch (Exception e) {
- LOGGER.error("TODO Auto-generated catch block", e);
- throw e;
+ } else {
+ answer = JmsMessageType.Message;
}
-
- answer = JmsMessageHelper.setJmsMessageHeaders(exchange, answer);
return answer;
}
-
- private static boolean hasIllegalHeaderKey(String key) {
- if (key == null) {
- return true;
- }
- if (key.equals("")) {
- return true;
- }
- if (key.indexOf(".") > -1) {
- return true;
- }
- if (key.indexOf("-") > -1) {
- return true;
- }
- return false;
- }
-
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java Thu Aug 30 03:42:12 2012
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.sjms.producer;
+import java.util.ArrayList;
+import java.util.List;
+
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -23,32 +26,40 @@ import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.BatchMessage;
import org.apache.camel.component.sjms.SjmsEndpoint;
import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
/**
- * The InOnlyProducer is responsible for publishing messages to the JMS
- * {@link Destination} for the value specified in the destinationName field.
+ * A Camel Producer that provides the InOnly Exchange pattern..
*/
public class InOnlyProducer extends SjmsProducer {
-
+
public InOnlyProducer(SjmsEndpoint endpoint) {
super(endpoint);
}
-
+
/*
* @see org.apache.camel.component.sjms.SjmsProducer#doCreateProducerModel()
- *
* @return
* @throws Exception
*/
+ @Override
public MessageProducerResources doCreateProducerModel() throws Exception {
Connection conn = getConnectionResource().borrowConnection();
+ TransactionCommitStrategy commitStrategy = null;
Session session = null;
if (isEndpointTransacted()) {
+ if (this.getCommitStrategy() != null) {
+ commitStrategy = this.getCommitStrategy();
+ } else {
+ commitStrategy = new DefaultTransactionCommitStrategy();
+ }
session = conn.createSession(true, getAcknowledgeMode());
} else {
session = conn.createSession(false, getAcknowledgeMode());
@@ -60,20 +71,48 @@ public class InOnlyProducer extends Sjms
messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
}
getConnectionResource().returnConnection(conn);
- return new MessageProducerResources(session, messageProducer);
+ return new MessageProducerResources(session, messageProducer, commitStrategy);
}
-
+
+ /*
+ * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
+ *
+ * @param exchange
+ * @param callback
+ * @throws Exception
+ */
@Override
public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
+ List<Message> messages = new ArrayList<Message>();
+ MessageProducerResources producer = getProducers().borrowObject();
if (getProducers() != null) {
- MessageProducerResources producer = getProducers().borrowObject();
-
+ if (exchange.getIn().getBody() != null) {
+ if (exchange.getIn().getBody() instanceof List) {
+ List<?> payload = (List<?>)exchange.getIn().getBody();
+ for (Object object : payload) {
+ Message message = null;
+ if (BatchMessage.class.isInstance(object)) {
+ BatchMessage<?> batchMessage = (BatchMessage<?>)object;
+ message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
+ .getJmsKeyFormatStrategy());
+ } else {
+ message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+ }
+ messages.add(message);
+ }
+ } else {
+ Object payload = exchange.getIn().getBody();
+ Message message = JmsMessageHelper.createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+ messages.add(message);
+ }
+ }
+
if (isEndpointTransacted()) {
- exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+ exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
+ }
+ for (Message message : messages) {
+ producer.getMessageProducer().send(message);
}
-
- Message message = JmsMessageHelper.createMessage(exchange, producer.getSession());
- producer.getMessageProducer().send(message);
getProducers().returnObject(producer);
callback.done(isSynchronous());
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Thu Aug 30 03:42:12 2012
@@ -39,44 +39,38 @@ import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.SjmsEndpoint;
import org.apache.camel.component.sjms.SjmsProducer;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsMessageExchangeHelper;
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
import org.apache.camel.component.sjms.jms.ObjectPool;
import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
import org.apache.camel.util.ObjectHelper;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * TODO Add Class documentation for InOutProducer
- *
+ * A Camel Producer that provides the InOut Exchange pattern.
*/
public class InOutProducer extends SjmsProducer {
/**
* We use the {@link ReadWriteLock} to manage the {@link TreeMap} in place
* of a {@link ConcurrentMap} because due to significant performance gains.
- *
* TODO Externalize the Exchanger Map to a store object
*/
private static Map<String, Exchanger<Object>> exchangerMap = new TreeMap<String, Exchanger<Object>>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
-
-
+
/**
- * A pool of {@link MessageConsumerResource} objects that are the
- * reply consumers.
- *
- * TODO Add Class documentation for MessageProducerPool
+ * A pool of {@link MessageConsumerResource} objects that are the reply
+ * consumers.
+ * TODO Add Class documentation for MessageProducerPool
* TODO Externalize
- *
*/
protected class MessageConsumerPool extends ObjectPool<MessageConsumerResource> {
/**
* TODO Add Constructor Javadoc
- *
+ *
* @param poolSize
*/
public MessageConsumerPool(int poolSize) {
@@ -112,32 +106,32 @@ public class InOutProducer extends SjmsP
} catch (Exception e) {
ObjectHelper.wrapRuntimeCamelException(e);
}
-
+
}
});
MessageConsumerResource mcm = new MessageConsumerResource(session, messageConsumer, replyToDestination);
return mcm;
}
-
+
@Override
protected void destroyObject(MessageConsumerResource model) throws Exception {
if (model.getMessageConsumer() != null) {
model.getMessageConsumer().close();
}
-
+
if (model.getSession() != null) {
if (model.getSession().getTransacted()) {
try {
model.getSession().rollback();
} catch (Exception e) {
- // Do nothing. Just make sure we are cleaned up
+ // Do nothing. Just make sure we are cleaned up
}
}
model.getSession().close();
}
}
}
-
+
/**
* TODO Add Class documentation for MessageConsumerResource
*/
@@ -153,6 +147,7 @@ public class InOutProducer extends SjmsP
* @param messageConsumer
*/
public MessageConsumerResource(Session session, MessageConsumer messageConsumer, Destination replyToDestination) {
+ super();
this.session = session;
this.messageConsumer = messageConsumer;
this.replyToDestination = replyToDestination;
@@ -161,7 +156,7 @@ public class InOutProducer extends SjmsP
public Session getSession() {
return session;
}
-
+
public MessageConsumer getMessageConsumer() {
return messageConsumer;
}
@@ -171,30 +166,6 @@ public class InOutProducer extends SjmsP
}
}
- protected class InOutResponseContainer {
- private final Exchange exchange;
- private final AsyncCallback callback;
-
- /**
- *
- * @param exchange
- * @param callback
- */
- public InOutResponseContainer(Exchange exchange, AsyncCallback callback) {
- this.exchange = exchange;
- this.callback = callback;
- }
-
- public Exchange getExchange() {
- return exchange;
- }
-
- public AsyncCallback getCallback() {
- return callback;
- }
- }
-
-
protected class InternalTempDestinationListener implements MessageListener {
private final Logger tempLogger = LoggerFactory.getLogger(InternalTempDestinationListener.class);
private Exchanger<Object> exchanger;
@@ -205,6 +176,7 @@ public class InOutProducer extends SjmsP
* @param exchanger
*/
public InternalTempDestinationListener(Exchanger<Object> exchanger) {
+ super();
this.exchanger = exchanger;
}
@@ -222,24 +194,20 @@ public class InOutProducer extends SjmsP
}
}
-
+
private MessageConsumerPool consumers;
-
+
public InOutProducer(SjmsEndpoint endpoint) {
super(endpoint);
endpoint.getConsumerCount();
}
-
+
@Override
protected void doStart() throws Exception {
if (ObjectHelper.isEmpty(getNamedReplyTo())) {
- if (log.isDebugEnabled()) {
- log.debug("No reply to destination is defined. Using temporary destinations.");
- }
+ log.debug("No reply to destination is defined. Using temporary destinations.");
} else {
- if (log.isDebugEnabled()) {
- log.debug("Using {} as the reply to destination.", getNamedReplyTo());
- }
+ log.debug("Using {} as the reply to destination.", getNamedReplyTo());
}
if (getConsumers() == null) {
setConsumers(new MessageConsumerPool(getConsumerCount()));
@@ -247,7 +215,7 @@ public class InOutProducer extends SjmsP
}
super.doStart();
}
-
+
@Override
protected void doStop() throws Exception {
super.doStop();
@@ -256,7 +224,7 @@ public class InOutProducer extends SjmsP
setConsumers(null);
}
}
-
+
@Override
public MessageProducerResources doCreateProducerModel() throws Exception {
Connection conn = getConnectionResource().borrowConnection();
@@ -275,13 +243,13 @@ public class InOutProducer extends SjmsP
getConnectionResource().returnConnection(conn);
return new MessageProducerResources(session, messageProducer);
}
-
- /**
- * TODO Add override javadoc
- * TODO time out is actually double as it waits for the producer and then waits for the response. Use an atomiclong to manage the countdown
- *
- * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
- *
+
+ /**
+ * TODO time out is actually double as it waits for the producer and then
+ * waits for the response. Use an atomic long to manage the countdown
+ *
+ * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange,
+ * org.apache.camel.AsyncCallback)
* @param exchange
* @param callback
* @throws Exception
@@ -292,7 +260,6 @@ public class InOutProducer extends SjmsP
MessageProducerResources producer = null;
try {
producer = getProducers().borrowObject(getResponseTimeOut());
- // producer = getProducers().borrowObject();
} catch (Exception e1) {
log.warn("The producer pool is exhausted. Consider setting producerCount to a higher value or disable the fixed size of the pool by setting fixedResourcePool=false.");
exchange.setException(new Exception("Producer Resource Pool is exhausted"));
@@ -300,11 +267,11 @@ public class InOutProducer extends SjmsP
if (producer != null) {
if (isEndpointTransacted()) {
- exchange.getUnitOfWork()
- .addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+ exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
}
- Message request = JmsMessageHelper.createMessage(exchange, producer.getSession());
+ Message request = JmsMessageExchangeHelper.createMessage(exchange, producer.getSession());
+
// TODO just set the correlation id don't get it from the
// message
String correlationId = null;
@@ -315,7 +282,7 @@ public class InOutProducer extends SjmsP
}
Object responseObject = null;
Exchanger<Object> messageExchanger = new Exchanger<Object>();
- JmsMessageHelper.setCorrelationId(request, correlationId);
+ JmsMessageExchangeHelper.setCorrelationId(request, correlationId);
try {
lock.writeLock().lock();
exchangerMap.put(correlationId, messageExchanger);
@@ -324,7 +291,7 @@ public class InOutProducer extends SjmsP
}
MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
- JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+ JmsMessageExchangeHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
consumers.returnObject(consumer);
producer.getMessageProducer().send(request);
@@ -335,8 +302,7 @@ public class InOutProducer extends SjmsP
}
try {
- responseObject = messageExchanger.exchange(null, getResponseTimeOut(),
- TimeUnit.MILLISECONDS);
+ responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
try {
lock.writeLock().lock();
@@ -357,7 +323,7 @@ public class InOutProducer extends SjmsP
exchange.setException((Throwable)responseObject);
} else if (responseObject instanceof Message) {
Message response = (Message)responseObject;
- JmsMessageHelper.populateExchange(response, exchange, true);
+ JmsMessageExchangeHelper.populateExchange(response, exchange, true);
} else {
exchange.setException(new CamelException("Unknown response type: " + responseObject));
}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
+
+/**
+ * Provides a thread safe counter to track the number of {@link Exchange}
+ * objects that have been been processed.
+ *
+ */
+public class BatchTransactionCommitStrategy implements TransactionCommitStrategy {
+
+ private final AtomicInteger current = new AtomicInteger(0);
+ private final int count;
+
+ /**
+ * @param count
+ */
+ public BatchTransactionCommitStrategy(int count) {
+ super();
+ this.count = count;
+ }
+
+ @Override
+ public boolean commit(Exchange exchange) throws Exception {
+ boolean answer = false;
+ int currentVal = current.incrementAndGet();
+ if (currentVal >= count) {
+ answer = true;
+ current.set(0);
+ }
+
+ return answer;
+ }
+
+ @Override
+ public boolean rollback(Exchange exchange) throws Exception {
+ current.set(0);
+ return true;
+ }
+
+}
Copied: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java (from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java?p2=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java&p1=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java&r1=1378786&r2=1378796&rev=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java Thu Aug 30 03:42:12 2012
@@ -14,28 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.sjms;
+package org.apache.camel.component.sjms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
/**
- * Strategy for applying encoding and decoding of JMS headers so they apply to the JMS spec.
- *
- * @version
+ * The default commit strategy for all transaction.
+ *
*/
-public interface KeyFormatStrategy {
+public class DefaultTransactionCommitStrategy implements TransactionCommitStrategy {
/**
- * Encodes the key before its sent as a {@link javax.jms.Message} message.
+ * @see org.apache.camel.component.sjms.TransactionCommitStrategy#commit(org.apache.camel.Exchange)
*
- * @param key the original key
- * @return the encoded key
+ * @param exchange
+ * @return
+ * @throws Exception
*/
- String encodeKey(String key);
+ @Override
+ public boolean commit(Exchange exchange) throws Exception {
+ return true;
+ }
/**
- * Decodes the key after its received from a {@link javax.jms.Message} message.
+ * @see org.apache.camel.component.sjms.TransactionCommitStrategy#rollback(org.apache.camel.Exchange)
*
- * @param key the encoded key
- * @return the decoded key as the original key
+ * @param exchange
+ * @return
+ * @throws Exception
*/
- String decodeKey(String key);
+ @Override
+ public boolean rollback(Exchange exchange) throws Exception {
+ return true;
+ }
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java Thu Aug 30 03:42:12 2012
@@ -16,61 +16,66 @@
*/
package org.apache.camel.component.sjms.tx;
-import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
import org.apache.camel.spi.Synchronization;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * TODO Add Class documentation for SessionTransactionSynchronization
- *
+ * SessionTransactionSynchronization is called at the completion of each {@link org.apache.camel.Exhcnage}.
*/
public class SessionTransactionSynchronization implements Synchronization {
private Logger log = LoggerFactory.getLogger(getClass());
private Session session;
-
- public SessionTransactionSynchronization(Session session) {
+ private final TransactionCommitStrategy commitStrategy;
+
+ public SessionTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) {
this.session = session;
+ if (commitStrategy == null) {
+ this.commitStrategy = new DefaultTransactionCommitStrategy();
+ } else {
+ this.commitStrategy = commitStrategy;
+ }
}
-
- /*
- * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
- *
+
+ /**
+ * @see
+ * org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
* @param exchange
*/
@Override
public void onFailure(Exchange exchange) {
- if (log.isDebugEnabled()) {
- log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
- }
try {
- if (session != null && session.getTransacted()) {
- this.session.rollback();
+ if (commitStrategy.rollback(exchange)) {
+ log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+ if (session != null && session.getTransacted()) {
+ this.session.rollback();
+ }
}
- } catch (JMSException e) {
+ } catch (Exception e) {
log.warn("Failed to rollback the session: {}", e.getMessage());
}
}
-
- /*
- * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange)
- *
+
+ /**
+ * @see
+ * org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange
+ * )
* @param exchange
*/
@Override
public void onComplete(Exchange exchange) {
- if (log.isDebugEnabled()) {
- log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
- }
try {
- if (session != null && session.getTransacted()) {
- this.session.commit();
+ if (commitStrategy.commit(exchange)) {
+ log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+ if (session != null && session.getTransacted()) {
+ this.session.commit();
+ }
}
- } catch (JMSException e) {
+ } catch (Exception e) {
log.warn("Failed to commit the session: {}", e.getMessage());
exchange.setException(e);
}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java Thu Aug 30 03:42:12 2012
@@ -24,19 +24,19 @@ import org.apache.camel.component.sjms.s
import org.junit.Test;
-public class JmsSelectorOptionTest extends JmsTestSupport {
+public class JmsSelectorOptionTest extends JmsTestSupport {
@Test
public void testJmsMessageWithSelector() throws Exception {
MockEndpoint endpointA = getMockEndpoint("mock:a");
MockEndpoint endpointB = getMockEndpoint("mock:b");
MockEndpoint endpointC = getMockEndpoint("mock:c");
-
+
endpointA.expectedBodiesReceivedInAnyOrder("A blue car!", "A blue car, again!");
endpointA.expectedHeaderReceived("color", "blue");
endpointB.expectedHeaderReceived("color", "red");
endpointB.expectedBodiesReceived("A red car!");
-
+
endpointC.expectedBodiesReceived("Message1", "Message2");
endpointC.expectedMessageCount(2);
@@ -48,7 +48,7 @@ public class JmsSelectorOptionTest exten
template.sendBodyAndHeader("sjms:queue:hello", "Message2", "SIZE_NUMBER", 1600);
assertMockEndpointsSatisfied();
}
-
+
@Test
public void testConsumerTemplate() throws Exception {
template.sendBodyAndHeader("sjms:queue:consumer", "Message1", "SIZE_NUMBER", 1505);
@@ -70,7 +70,7 @@ public class JmsSelectorOptionTest exten
}
}
-
+
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java Thu Aug 30 03:42:12 2012
@@ -23,7 +23,7 @@ import org.apache.camel.component.sjms.s
import org.junit.Test;
/**
- * @version
+ * @version
*/
public class JmsSelectorTest extends JmsTestSupport {
@@ -46,13 +46,13 @@ public class JmsSelectorTest extends Jms
resultEndpoint.assertIsSatisfied();
}
-
+
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("sjms:test.a").to("sjms:test.b");
- from("sjms:test.b?messageSelector=cheese='y'").to("mock:result");
+ from("sjms:test.a").to("log:test-before?showAll=true").to("sjms:test.b");
+ from("sjms:test.b?messageSelector=cheese='y'").to("log:test-after?showAll=true").to("mock:result");
}
};
}
-}
\ No newline at end of file
+}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java Thu Aug 30 03:42:12 2012
@@ -24,10 +24,9 @@ import org.junit.Test;
public class SimpleJmsComponentTest extends CamelTestSupport {
-
@Test
public void testHelloWorld() throws Exception {
- SjmsComponent component = (SjmsComponent) this.context.getComponent("sjms");
+ SjmsComponent component = (SjmsComponent)this.context.getComponent("sjms");
assertNotNull(component);
}
@@ -35,8 +34,7 @@ public class SimpleJmsComponentTest exte
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- "vm://broker?broker.persistent=false&broker.useJmx=true");
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
SjmsComponent component = new SjmsComponent();
component.setConnectionFactory(connectionFactory);
getContext().addComponent("sjms", component);
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java Thu Aug 30 03:42:12 2012
@@ -28,7 +28,7 @@ import org.junit.Test;
public class SjmsEndpointNameOverrideTest extends CamelTestSupport {
private static final String BEAN_NAME = "not-sjms";
-
+
@Override
protected boolean useJmx() {
return true;
@@ -39,7 +39,7 @@ public class SjmsEndpointNameOverrideTes
Endpoint endpoint = context.getEndpoint(BEAN_NAME + ":test");
assertNotNull(endpoint);
assertTrue(endpoint instanceof SjmsEndpoint);
- SjmsEndpoint sjms = (SjmsEndpoint) endpoint;
+ SjmsEndpoint sjms = (SjmsEndpoint)endpoint;
assertEquals(sjms.getEndpointUri(), BEAN_NAME + "://queue:test");
assertEquals(sjms.createExchange().getPattern(), ExchangePattern.InOnly);
}
@@ -68,8 +68,7 @@ public class SjmsEndpointNameOverrideTes
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- "vm://broker?broker.persistent=false&broker.useJmx=false");
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false");
SjmsComponent component = new SjmsComponent();
component.setMaxConnections(1);
component.setConnectionFactory(connectionFactory);
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java Thu Aug 30 03:42:12 2012
@@ -26,7 +26,7 @@ import org.apache.camel.test.junit4.Came
import org.junit.Test;
public class SjmsEndpointTest extends CamelTestSupport {
-
+
@Override
protected boolean useJmx() {
return true;
@@ -37,7 +37,7 @@ public class SjmsEndpointTest extends Ca
Endpoint endpoint = context.getEndpoint("sjms:test");
assertNotNull(endpoint);
assertTrue(endpoint instanceof SjmsEndpoint);
- SjmsEndpoint sjms = (SjmsEndpoint) endpoint;
+ SjmsEndpoint sjms = (SjmsEndpoint)endpoint;
assertEquals(sjms.getEndpointUri(), "sjms://queue:test");
assertEquals(sjms.createExchange().getPattern(), ExchangePattern.InOnly);
}
@@ -60,7 +60,7 @@ public class SjmsEndpointTest extends Ca
Endpoint endpoint = context.getEndpoint("sjms:queue:test?transacted=true");
assertNotNull(endpoint);
assertTrue(endpoint instanceof SjmsEndpoint);
- SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+ SjmsEndpoint qe = (SjmsEndpoint)endpoint;
assertTrue(qe.isTransacted());
}
@@ -69,7 +69,7 @@ public class SjmsEndpointTest extends Ca
Endpoint endpoint = context.getEndpoint("sjms:queue:test?synchronous=true");
assertNotNull(endpoint);
assertTrue(endpoint instanceof SjmsEndpoint);
- SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+ SjmsEndpoint qe = (SjmsEndpoint)endpoint;
assertTrue(qe.isSynchronous());
}
@@ -79,7 +79,7 @@ public class SjmsEndpointTest extends Ca
Endpoint endpoint = context.getEndpoint("sjms:queue:test?namedReplyTo=" + namedReplyTo);
assertNotNull(endpoint);
assertTrue(endpoint instanceof SjmsEndpoint);
- SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+ SjmsEndpoint qe = (SjmsEndpoint)endpoint;
assertEquals(qe.getNamedReplyTo(), namedReplyTo);
assertEquals(qe.createExchange().getPattern(), ExchangePattern.InOut);
}
@@ -87,10 +87,10 @@ public class SjmsEndpointTest extends Ca
@Test
public void testDefaultExchangePattern() throws Exception {
try {
- SjmsEndpoint sjms = (SjmsEndpoint) context.getEndpoint("sjms:queue:test");
+ SjmsEndpoint sjms = (SjmsEndpoint)context.getEndpoint("sjms:queue:test");
assertNotNull(sjms);
assertEquals(ExchangePattern.InOnly, sjms.getExchangePattern());
-// assertTrue(sjms.createExchange().getPattern().equals(ExchangePattern.InOnly));
+ // assertTrue(sjms.createExchange().getPattern().equals(ExchangePattern.InOnly));
} catch (Exception e) {
fail("Exception thrown: " + e.getLocalizedMessage());
}
@@ -129,7 +129,7 @@ public class SjmsEndpointTest extends Ca
Endpoint endpoint = context.getEndpoint("sjms:queue:test?namedReplyTo=" + namedReplyTo + "&exchangePattern=" + ExchangePattern.InOut);
assertNotNull(endpoint);
assertTrue(endpoint instanceof SjmsEndpoint);
- SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+ SjmsEndpoint qe = (SjmsEndpoint)endpoint;
assertEquals(qe.getNamedReplyTo(), namedReplyTo);
assertEquals(qe.createExchange().getPattern(), ExchangePattern.InOut);
}
@@ -144,15 +144,14 @@ public class SjmsEndpointTest extends Ca
Endpoint endpoint = context.getEndpoint("sjms:queue:test?synchronous=true");
assertNotNull(endpoint);
assertTrue(endpoint instanceof SjmsEndpoint);
- SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+ SjmsEndpoint qe = (SjmsEndpoint)endpoint;
assertTrue(qe.getDestinationName().equals("test"));
}
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- "vm://broker?broker.persistent=false&broker.useJmx=false");
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false");
SjmsComponent component = new SjmsComponent();
component.setMaxConnections(3);
component.setConnectionFactory(connectionFactory);
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java Thu Aug 30 03:42:12 2012
@@ -16,19 +16,17 @@
*/
package org.apache.camel.component.sjms.consumer;
-
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.support.JmsTestSupport;
-
import org.junit.Test;
/**
- * @version
+ * @version
*/
public class InOnlyConsumerDefaultTest extends JmsTestSupport {
- private static final String SJMS_QUEUE_NAME = "sjms:in.only.consumer.";
+ private static final String SJMS_QUEUE_NAME = "sjms:in.only.consumer";
private static final String MOCK_RESULT = "mock:result";
@Test
@@ -39,15 +37,16 @@ public class InOnlyConsumerDefaultTest e
mock.expectedBodiesReceived(expectedBody);
template.sendBody(SJMS_QUEUE_NAME, expectedBody);
-
+
mock.assertIsSatisfied();
}
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
+ @Override
public void configure() throws Exception {
- from(SJMS_QUEUE_NAME)
- .to(MOCK_RESULT);
+ from(SJMS_QUEUE_NAME).to(MOCK_RESULT);
}
};
}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -45,7 +45,7 @@ public class InOnlyQueueConsumerTest ext
}
- /*
+ /**
* @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
*
* @return
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -45,7 +45,7 @@ public class InOnlyTopicConsumerTest ext
}
- /*
+ /**
* @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
*
* @return
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java Thu Aug 30 03:42:12 2012
@@ -171,6 +171,7 @@ public class ObjectPoolTest {
class TestPool extends ObjectPool<MyPooledObject> {
public TestPool() {
+ super();
}
public TestPool(int poolSize) {
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -65,7 +65,7 @@ public class InOnlyQueueProducerTest ext
}
- /*
+ /**
* @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
*
* @return
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java Thu Aug 30 03:42:12 2012
@@ -65,7 +65,7 @@ public class InOnlyTopicProducerTest ext
}
- /*
+ /**
* @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
*
* @return
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java Thu Aug 30 03:42:12 2012
@@ -97,15 +97,15 @@ public class InOutQueueProducerAsyncLoad
executor.execute(worker);
}
while (context.getInflightRepository().size() > 0) {
-
+ Thread.sleep(100);
}
executor.shutdown();
while (!executor.isTerminated()) {
- //
+ Thread.sleep(100);
}
}
- /*
+ /**
* @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
*
* @return
@@ -129,6 +129,10 @@ public class InOutQueueProducerAsyncLoad
protected class MyMessageListener implements MessageListener {
private MessageProducer mp;
+ public MyMessageListener() {
+ super();
+ }
+
@Override
public void onMessage(Message message) {
try {
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java Thu Aug 30 03:42:12 2012
@@ -129,6 +129,10 @@ public class InOutQueueProducerSyncLoadT
protected class MyMessageListener implements MessageListener {
private MessageProducer mp;
+ public MyMessageListener() {
+ super();
+ }
+
@Override
public void onMessage(Message message) {
try {
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -117,6 +117,7 @@ public class InOutQueueProducerTest exte
* @param response
*/
public MyMessageListener(String request, String response) {
+ super();
this.requestText = request;
this.responseText = response;
}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -91,6 +91,7 @@ public class InOutTempQueueProducerTest
* @param response
*/
public MyMessageListener(String request, String response) {
+ super();
this.requestText = request;
this.responseText = response;
}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -65,7 +65,7 @@ public class QueueProducerTest extends J
}
- /*
+ /**
* @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
*
* @return