You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/02 12:56:44 UTC
svn commit: r1368413 [2/5] - in /camel/trunk: components/
components/camel-sjms/ components/camel-sjms/src/
components/camel-sjms/src/main/ components/camel-sjms/src/main/java/
components/camel-sjms/src/main/java/org/
components/camel-sjms/src/main/jav...
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.consumer;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * TODO Add Class documentation for DefaultMessageHandler
+ * TODO Create a producer cache manager to store and purge unused cashed producers or we will have a memory leak
+ *
+ */
+public class InOutMessageHandler extends DefaultMessageHandler {
+
+ private Map<String, MessageProducer> producerCache = new TreeMap<String, MessageProducer>();
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param endpoint
+ * @param processor
+ */
+ public InOutMessageHandler(Endpoint endpoint, ExecutorService executor) {
+ this(endpoint, executor, null);
+ }
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param stopped
+ * @param synchronization
+ */
+ public InOutMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
+ super(endpoint, executor, synchronization);
+ }
+
+ /**
+ * @param message
+ */
+ @Override
+ public void doHandleMessage(final Exchange exchange) {
+ try {
+ MessageProducer messageProducer = null;
+ Object obj = exchange.getIn().getHeader("JMSReplyTo");
+ if (obj != null) {
+ Destination replyTo = null;
+ if (isDestination(obj)) {
+ replyTo = (Destination)obj;
+ } else if (obj instanceof String) {
+ replyTo = JmsObjectFactory.createDestination(getSession(), (String)obj, isTopic());
+ } else {
+ throw new Exception("The value of JMSReplyTo must be a valid Destination or String. Value provided: "
+ + obj);
+ }
+
+ String destinationName = getDestinationName(replyTo);
+ try {
+ lock.readLock().lock();
+ if (producerCache.containsKey(destinationName)) {
+ messageProducer = producerCache.get(destinationName);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ if (messageProducer == null) {
+ try {
+ lock.writeLock().lock();
+ messageProducer = getSession().createProducer(replyTo);
+ producerCache.put(destinationName, messageProducer);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ }
+
+ MessageHanderAsyncCallback callback = new MessageHanderAsyncCallback(exchange, messageProducer);
+ if (exchange.isFailed()) {
+ return;
+ } else {
+ if (isTransacted() || isSynchronous()) {
+ // must process synchronous if transacted or configured to
+ // do so
+ if (log.isDebugEnabled()) {
+ log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
+ }
+ try {
+ AsyncProcessorHelper.process(getProcessor(), exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ } finally {
+ callback.done(true);
+ }
+ } else {
+ // process asynchronous using the async routing engine
+ if (log.isDebugEnabled()) {
+ log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
+ }
+ boolean sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback);
+ if (!sync) {
+ // will be done async so return now
+ return;
+ }
+ }
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("SjmsMessageConsumer invoked for Exchange id:{} ",
+ exchange.getExchangeId());
+ }
+ }
+
+ @Override
+ public void close() {
+ for (String key : producerCache.keySet()) {
+ MessageProducer mp = producerCache.get(key);
+ try {
+ mp.close();
+ } catch (JMSException e) {
+ ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+ producerCache.clear();
+ }
+
+ private boolean isDestination(Object object) {
+ return object instanceof Destination;
+ }
+
+ private String getDestinationName(Destination destination) throws Exception {
+ String answer = null;
+ if (destination instanceof Queue) {
+ answer = ((Queue)destination).getQueueName();
+ } else if (destination instanceof Topic) {
+ answer = ((Topic)destination).getTopicName();
+ }
+
+ return answer;
+ }
+
+ protected class MessageHanderAsyncCallback implements AsyncCallback {
+
+ private Exchange exchange;
+ private MessageProducer localProducer;
+
+ public MessageHanderAsyncCallback(Exchange exchange, MessageProducer localProducer) {
+ super();
+ this.exchange = exchange;
+ this.localProducer = localProducer;
+ }
+
+ @Override
+ public void done(boolean sync) {
+
+ try {
+ Message response = JmsMessageHelper.createMessage(exchange,
+ getSession(), true);
+ response.setJMSCorrelationID(exchange.getIn().getHeader(
+ "JMSCorrelationID", String.class));
+ localProducer.send(response);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ }
+ }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * JMS constants
+ */
+public final class JmsConstants {
+
+ public static final String JMS_DESTINATION = "CamelJmsDestination";
+ public static final String JMS_DESTINATION_NAME = "CamelJmsDestinationName";
+ public static final String JMS_MESSAGE_TYPE = "CamelJmsMessageType";
+ public static final String JMS_DELIVERY_MODE = "CamelJmsDeliveryMode";
+
+ private JmsConstants() {
+ // utility class
+ }
+
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+public enum JmsMessageHeaderType {
+ JMSDestination,
+ JMSDeliveryMode,
+ JMSExpiration,
+ JMSPriority,
+ JMSMessageID,
+ JMSTimestamp,
+ JMSCorrelationID,
+ JMSReplyTo,
+ JMSType,
+ JMSRedelivered,
+
+ /*
+ * Add known custom headers
+ */
+ JMSXGroupID
+
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.sjms.DefaultJmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.IllegalHeaderException;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE;
+import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX;
+import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX;
+import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
+
+/**
+ * Utility class for {@link javax.jms.Message}.
+ *
+ * @version
+ */
+public final class JmsMessageHelper {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+
+ private JmsMessageHelper() {
+ }
+
+ public static Exchange createExchange(Message message, Endpoint endpoint) {
+ Exchange exchange = endpoint.createExchange();
+ return populateExchange(message, exchange, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Exchange populateExchange(Message message, Exchange exchange, boolean out) {
+ try {
+ JmsMessageHelper.setJmsMessageHeaders(message, exchange, out);
+ if (message != null) {
+ // convert to JMS Message of the given type
+
+ DefaultMessage bodyMessage = null;
+ if (out) {
+ bodyMessage = (DefaultMessage) exchange.getOut();
+ } else {
+ bodyMessage = (DefaultMessage) exchange.getIn();
+ }
+ switch (JmsMessageHelper.discoverType(message)) {
+ case Bytes:
+ BytesMessage bytesMessage = (BytesMessage) message;
+ if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
+ LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
+ return null;
+ }
+ byte[] result = new byte[(int) bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(result);
+ bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Bytes);
+ bodyMessage.setBody(result);
+ break;
+ case Map:
+ HashMap<String, Object> body = new HashMap<String, Object>();
+ MapMessage mapMessage = (MapMessage) message;
+ Enumeration<String> names = mapMessage.getMapNames();
+ while (names.hasMoreElements()) {
+ String key = names.nextElement();
+ Object value = mapMessage.getObject(key);
+ body.put(key, value);
+ }
+ bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Map);
+ bodyMessage.setBody(body);
+ break;
+ case Object:
+ ObjectMessage objMsg = (ObjectMessage) message;
+ bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Object);
+ bodyMessage.setBody(objMsg.getObject());
+ break;
+ case Text:
+ TextMessage textMsg = (TextMessage) message;
+ bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Text);
+ bodyMessage.setBody(textMsg.getText());
+ break;
+ case Message:
+ default:
+ // Do nothing. Only set the headers for an empty message
+ bodyMessage.setBody(message);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ return exchange;
+ }
+
+ /**
+ * Removes the property from the JMS message.
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to remove
+ * @return the old value of the property or <tt>null</tt> if not exists
+ * @throws JMSException can be thrown
+ */
+ public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException {
+ // check if the property exists
+ if (!jmsMessage.propertyExists(name)) {
+ return null;
+ }
+
+ Object answer = null;
+
+ // store the properties we want to keep in a temporary map
+ // as the JMS API is a bit strict as we are not allowed to
+ // clear a single property, but must clear them all and redo
+ // the properties
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ Enumeration<?> en = jmsMessage.getPropertyNames();
+ while (en.hasMoreElements()) {
+ String key = (String) en.nextElement();
+ if (name.equals(key)) {
+ answer = key;
+ } else {
+ map.put(key, jmsMessage.getObjectProperty(key));
+ }
+ }
+
+ // redo the properties to keep
+ jmsMessage.clearProperties();
+ for (Entry<String, Object> entry : map.entrySet()) {
+ jmsMessage.setObjectProperty(entry.getKey(), entry.getValue());
+ }
+
+ return answer;
+ }
+
+ /**
+ * Tests whether a given property with the name exists
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to test if exists
+ * @return <tt>true</tt> if the property exists, <tt>false</tt> if not.
+ * @throws JMSException can be thrown
+ */
+ public static boolean hasProperty(Message jmsMessage, String name) throws JMSException {
+ Enumeration<?> en = jmsMessage.getPropertyNames();
+ while (en.hasMoreElements()) {
+ String key = (String) en.nextElement();
+ if (name.equals(key)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Sets the property on the given JMS message.
+ *
+ * @param jmsMessage the JMS message
+ * @param name name of the property to set
+ * @param value the value
+ * @throws JMSException can be thrown
+ */
+ public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
+ if (value == null) {
+ return;
+ }
+ if (value instanceof Byte) {
+ jmsMessage.setByteProperty(name, (Byte) value);
+ } else if (value instanceof Boolean) {
+ jmsMessage.setBooleanProperty(name, (Boolean) value);
+ } else if (value instanceof Double) {
+ jmsMessage.setDoubleProperty(name, (Double) value);
+ } else if (value instanceof Float) {
+ jmsMessage.setFloatProperty(name, (Float) value);
+ } else if (value instanceof Integer) {
+ jmsMessage.setIntProperty(name, (Integer) value);
+ } else if (value instanceof Long) {
+ jmsMessage.setLongProperty(name, (Long) value);
+ } else if (value instanceof Short) {
+ jmsMessage.setShortProperty(name, (Short) value);
+ } else if (value instanceof String) {
+ jmsMessage.setStringProperty(name, (String) value);
+ } else {
+ // fallback to Object
+ jmsMessage.setObjectProperty(name, value);
+ }
+ }
+
+ /**
+ * Sets the correlation id on the JMS message.
+ * <p/>
+ * Will ignore exception thrown
+ *
+ * @param message the JMS message
+ * @param correlationId the correlation id
+ */
+ public static void setCorrelationId(Message message, String correlationId) {
+ try {
+ message.setJMSCorrelationID(correlationId);
+ } catch (JMSException e) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Error setting the correlationId: {}", correlationId);
+ }
+ }
+ }
+
+ /**
+ * Normalizes the destination name, by removing any leading queue or topic prefixes.
+ *
+ * @param destination the destination
+ * @return the normalized destination
+ */
+ public static String normalizeDestinationName(String destination) {
+ if (ObjectHelper.isEmpty(destination)) {
+ return destination;
+ }
+ if (destination.startsWith(QUEUE_PREFIX)) {
+ return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+ } else if (destination.startsWith(TOPIC_PREFIX)) {
+ return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+ } else {
+ return destination;
+ }
+ }
+
+ /**
+ * Sets the JMSReplyTo on the message.
+ *
+ * @param message the message
+ * @param replyTo the reply to destination
+ */
+ public static void setJMSReplyTo(Message message, Destination replyTo) {
+ try {
+ message.setJMSReplyTo(replyTo);
+ } catch (Exception e) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
+ }
+ }
+ }
+
+ /**
+ * Gets the JMSReplyTo from the message.
+ *
+ * @param message the message
+ * @return the reply to, can be <tt>null</tt>
+ */
+ public static Destination getJMSReplyTo(Message message) {
+ try {
+ return message.getJMSReplyTo();
+ } catch (Exception e) {
+ // ignore due OracleAQ does not support accessing JMSReplyTo
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets the JMSType from the message.
+ *
+ * @param message the message
+ * @return the type, can be <tt>null</tt>
+ */
+ public static String getJMSType(Message message) {
+ try {
+ return message.getJMSType();
+ } catch (Exception e) {
+ // ignore due OracleAQ does not support accessing JMSType
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets the JMSRedelivered from the message.
+ *
+ * @param message the message
+ * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, <tt>null</tt> if not able to determine
+ */
+ public static Boolean getJMSRedelivered(Message message) {
+ try {
+ return message.getJMSRedelivered();
+ } catch (Exception e) {
+ // ignore if JMS broker do not support this
+ }
+
+ return null;
+ }
+
+ /**
+ * Sets the JMSDeliveryMode on the message.
+ *
+ * @param exchange the exchange
+ * @param message the message
+ * @param deliveryMode the delivery mode, either as a String or integer
+ * @throws javax.jms.JMSException is thrown if error setting the delivery mode
+ */
+ public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode) throws JMSException {
+ Integer mode = null;
+
+ if (deliveryMode instanceof String) {
+ String s = (String) deliveryMode;
+ if ("PERSISTENT".equalsIgnoreCase(s)) {
+ mode = DeliveryMode.PERSISTENT;
+ } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
+ mode = DeliveryMode.NON_PERSISTENT;
+ } else {
+ // it may be a number in the String so try that
+ Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
+ if (value != null) {
+ mode = value;
+ } else {
+ throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode);
+ }
+ }
+ } else {
+ // fallback and try to convert to a number
+ Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
+ if (value != null) {
+ mode = value;
+ }
+ }
+
+ if (mode != null) {
+ message.setJMSDeliveryMode(mode);
+ message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode);
+ }
+ }
+
+
+ public static Message setJmsMessageHeaders(final Exchange exchange, final Message jmsMessage) throws Exception {
+ Map<String, Object> headers = new HashMap<String, Object>(exchange.getIn().getHeaders());
+ Set<String> keys = headers.keySet();
+ for (String headerName : keys) {
+ Object headerValue = headers.get(headerName);
+ if (headerName.equalsIgnoreCase("JMSCorrelationID")) {
+ jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+ } else if (headerName.equalsIgnoreCase("JMSReplyTo") && headerValue != null) {
+ if (headerValue instanceof String) {
+ // if the value is a String we must normalize it first
+ headerValue = (String) headerValue;
+ } else {
+ // TODO write destination converter
+ // Destination replyTo =
+ // ExchangeHelper.convertToType(exchange, Destination.class,
+ // headerValue);
+ // jmsMessage.setJMSReplyTo(replyTo);
+ }
+ } else if (headerName.equalsIgnoreCase("JMSType")) {
+ jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+ } else if (headerName.equalsIgnoreCase("JMSPriority")) {
+ jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
+ } else if (headerName.equalsIgnoreCase("JMSDeliveryMode")) {
+ JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+ } else if (headerName.equalsIgnoreCase("JMSExpiration")) {
+ jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
+ } else {
+ // The following properties are set by the MessageProducer:
+ // JMSDestination
+ // The following are set on the underlying JMS provider:
+ // JMSMessageID, JMSTimestamp, JMSRedelivered
+ // log at trace level to not spam log
+ LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+ if (headerName.equalsIgnoreCase("JMSDestination")
+ || headerName.equalsIgnoreCase("JMSMessageID")
+ || headerName.equalsIgnoreCase("JMSTimestamp")
+ || headerName.equalsIgnoreCase("JMSRedelivered")) {
+ // The following properties are set by the MessageProducer:
+ // JMSDestination
+ // The following are set on the underlying JMS provider:
+ // JMSMessageID, JMSTimestamp, JMSRedelivered
+ // log at trace level to not spam log
+ LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+ } else {
+ if (!(headerValue instanceof JmsMessageType)) {
+ String encodedName = new DefaultJmsKeyFormatStrategy().encodeKey(headerName);
+ JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+ }
+ }
+ }
+ }
+ return jmsMessage;
+ }
+
+ public static JmsMessageType discoverType(Message value) throws Exception {
+ JmsMessageType answer = null;
+ if (value != null) {
+ if (Message.class.isInstance(value)) {
+ if (BytesMessage.class.isInstance(value)) {
+ answer = JmsMessageType.Bytes;
+ } else if (MapMessage.class.isInstance(value)) {
+ answer = JmsMessageType.Map;
+ } else if (TextMessage.class.isInstance(value)) {
+ answer = JmsMessageType.Text;
+ } else if (ObjectMessage.class.isInstance(value)) {
+ answer = JmsMessageType.Object;
+ } else {
+ answer = JmsMessageType.Message;
+ }
+ }
+ }
+ return answer;
+ }
+
+ public static JmsMessageType discoverType(final Exchange exchange) {
+ JmsMessageType answer = (JmsMessageType) exchange.getIn().getHeader(JMS_MESSAGE_TYPE);
+ if (answer == null) {
+ final Object value = exchange.getIn().getBody();
+ if (value != null) {
+ if (Byte[].class.isInstance(value)) {
+ answer = JmsMessageType.Bytes;
+ } else if (Collection.class.isInstance(value)) {
+ answer = JmsMessageType.Map;
+ } else if (String.class.isInstance(value)) {
+ answer = JmsMessageType.Text;
+ } else if (Serializable.class.isInstance(value)) {
+ answer = JmsMessageType.Object;
+ } else {
+ answer = JmsMessageType.Message;
+ }
+ }
+ }
+ return answer;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out)
+ throws JMSException {
+ HashMap<String, Object> headers = new HashMap<String, Object>();
+ if (jmsMessage != null) {
+ // lets populate the standard JMS message headers
+ try {
+ headers.put(
+ JmsMessageHeaderType.JMSCorrelationID.toString(),
+ jmsMessage.getJMSCorrelationID());
+ headers.put(
+ JmsMessageHeaderType.JMSDeliveryMode.toString(),
+ jmsMessage.getJMSDeliveryMode());
+ headers.put(
+ JmsMessageHeaderType.JMSDestination.toString(),
+ jmsMessage.getJMSDestination());
+ headers.put(
+ JmsMessageHeaderType.JMSExpiration.toString(),
+ jmsMessage.getJMSExpiration());
+ headers.put(
+ JmsMessageHeaderType.JMSMessageID.toString(),
+ jmsMessage.getJMSMessageID());
+ headers.put(
+ JmsMessageHeaderType.JMSPriority.toString(),
+ jmsMessage.getJMSPriority());
+ headers.put(
+ JmsMessageHeaderType.JMSRedelivered.toString(),
+ jmsMessage.getJMSRedelivered());
+ headers.put(
+ JmsMessageHeaderType.JMSTimestamp.toString(),
+ jmsMessage.getJMSTimestamp());
+ headers.put(
+ JmsMessageHeaderType.JMSReplyTo.toString(),
+ JmsMessageHelper.getJMSReplyTo(jmsMessage));
+ headers.put(
+ JmsMessageHeaderType.JMSType.toString(),
+ JmsMessageHelper.getJMSType(jmsMessage));
+
+ // this works around a bug in the ActiveMQ property handling
+ headers.put(
+ JmsMessageHeaderType.JMSXGroupID.toString(),
+ jmsMessage.getStringProperty(JmsMessageHeaderType.JMSXGroupID.toString()));
+ } catch (JMSException e) {
+ throw new RuntimeCamelException(e);
+ }
+
+ for (Enumeration<String> enumeration = jmsMessage
+ .getPropertyNames(); enumeration.hasMoreElements();) {
+ String key = enumeration.nextElement();
+ if (hasIllegalHeaderKey(key)) {
+ throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value");
+ }
+ Object value = jmsMessage.getObjectProperty(key);
+ headers.put(key, value);
+ }
+ }
+ if (out) {
+ exchange.getOut().setHeaders(headers);
+ } else {
+ exchange.getIn().setHeaders(headers);
+ }
+ return exchange;
+ }
+
+ public static Message createMessage(Exchange exchange, Session session) throws Exception {
+ return createMessage(exchange, session, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Message createMessage(Exchange exchange, Session session, boolean out) throws Exception {
+ Message answer = null;
+ Object body = null;
+ try {
+ if (out && exchange.getOut().getBody() != null) {
+ body = exchange.getOut().getBody();
+ } else {
+ body = exchange.getIn().getBody();
+ }
+ JmsMessageType messageType = JmsMessageHelper.discoverType(exchange);
+
+ switch (messageType) {
+ case Bytes:
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes((byte[])body);
+ answer = bytesMessage;
+ break;
+ case Map:
+ MapMessage mapMessage = session.createMapMessage();
+ Map<String, Object> objMap = (Map<String, Object>)body;
+ Set<String> keys = objMap.keySet();
+ for (String key : keys) {
+ Object value = objMap.get(key);
+ mapMessage.setObject(key, value);
+ }
+ answer = mapMessage;
+ break;
+ case Object:
+ ObjectMessage objectMessage = session.createObjectMessage();
+ objectMessage.setObject((Serializable)body);
+ answer = objectMessage;
+ break;
+ case Text:
+ TextMessage textMessage = session.createTextMessage();
+ textMessage.setText((String)body);
+ answer = textMessage;
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("TODO Auto-generated catch block", e);
+ throw e;
+ }
+
+ answer = JmsMessageHelper.setJmsMessageHeaders(exchange, answer);
+ return answer;
+ }
+
+ private static boolean hasIllegalHeaderKey(String key) {
+ if (key == null) {
+ return true;
+ }
+ if (key.equals("")) {
+ return true;
+ }
+ if (key.indexOf(".") > -1) {
+ return true;
+ }
+ if (key.indexOf("-") > -1) {
+ return true;
+ }
+ return false;
+ }
+
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * Enum for different {@link javax.jms.Message} types.
+ *
+ * @version
+ */
+public enum JmsMessageType {
+
+ /**
+ * First the JMS Message types
+ */
+ Bytes,
+ Map,
+ Object,
+ /**
+ * TODO Write support for Stream Messages
+ */
+ //Stream,
+ Text,
+
+ /**
+ * BlobMessage which is not supported by all JMS implementations
+ */
+ Blob,
+
+ /**
+ * The default type that can be used for empty messages.
+ */
+ Message
+
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * TODO Add Class documentation for JmsObjectFactory
+ *
+ */
+public final class JmsObjectFactory {
+
+ private JmsObjectFactory() {
+ //Helper class
+ }
+
+ public static Destination createDestination(Session session, String destinationName, boolean topic)
+ throws Exception {
+ if (topic) {
+ return createTopic(session, destinationName);
+ } else {
+ return createQueue(session, destinationName);
+ }
+ }
+
+ public static Destination createQueue(Session session, String destinationName) throws Exception {
+ return session.createQueue(destinationName);
+ }
+
+ public static Destination createTemporaryDestination(Session session, boolean topic) throws Exception {
+ if (topic) {
+ return session.createTemporaryTopic();
+ } else {
+ return session.createTemporaryQueue();
+ }
+ }
+
+ public static Destination createTopic(Session session, String destinationName) throws Exception {
+ return session.createTopic(destinationName);
+ }
+
+ public static MessageConsumer createQueueConsumer(Session session, String destinationName) throws Exception {
+ return createMessageConsumer(session, destinationName, null, false, null, true);
+ }
+
+ public static MessageConsumer createQueueConsumer(Session session, String destinationName, String messageSelector) throws Exception {
+ return createMessageConsumer(session, destinationName, messageSelector, false, null, true);
+ }
+
+ public static MessageConsumer createTopicConsumer(Session session, String destinationName, String messageSelector) throws Exception {
+ return createMessageConsumer(session, destinationName, messageSelector, true, null, true);
+ }
+
+ public static MessageConsumer createTemporaryMessageConsumer(
+ Session session,
+ String messageSelector,
+ boolean topic,
+ String durableSubscriptionId,
+ boolean noLocal) throws Exception {
+ Destination destination = createTemporaryDestination(session, topic);
+ return createMessageConsumer(session, destination, messageSelector, topic, durableSubscriptionId, noLocal);
+ }
+
+ public static MessageConsumer createMessageConsumer(
+ Session session,
+ String destinationName,
+ String messageSelector,
+ boolean topic,
+ String durableSubscriptionId) throws Exception {
+ return createMessageConsumer(session, destinationName, messageSelector, topic, durableSubscriptionId, true);
+ }
+
+ public static MessageConsumer createMessageConsumer(
+ Session session,
+ String destinationName,
+ String messageSelector,
+ boolean topic,
+ String durableSubscriptionId,
+ boolean noLocal) throws Exception {
+ Destination destination = null;
+ if (topic) {
+ destination = session.createTopic(destinationName);
+
+ } else {
+ destination = session.createQueue(destinationName);
+ }
+ return createMessageConsumer(session, destination, messageSelector, topic, durableSubscriptionId, noLocal);
+ }
+
+ public static MessageConsumer createMessageConsumer(
+ Session session,
+ Destination destination,
+ String messageSelector,
+ boolean topic,
+ String durableSubscriptionId,
+ boolean noLocal) throws Exception {
+ MessageConsumer messageConsumer = null;
+
+ if (topic) {
+ TopicSession ts = (TopicSession)session;
+ if (ObjectHelper.isNotEmpty(durableSubscriptionId)) {
+ if (ObjectHelper.isNotEmpty(messageSelector)) {
+ messageConsumer = ts.createDurableSubscriber((Topic)destination, durableSubscriptionId,
+ messageSelector, noLocal);
+ } else {
+ messageConsumer = ts.createDurableSubscriber((Topic)destination, durableSubscriptionId);
+ }
+
+ } else {
+ if (ObjectHelper.isNotEmpty(messageSelector)) {
+ messageConsumer = ts.createSubscriber((Topic)destination, messageSelector, noLocal);
+ } else {
+ messageConsumer = ts.createSubscriber((Topic)destination);
+ }
+ }
+ } else {
+ if (ObjectHelper.isNotEmpty(messageSelector)) {
+ messageConsumer = session.createConsumer(destination, messageSelector);
+ } else {
+ messageConsumer = session.createConsumer(destination);
+ }
+ }
+ return messageConsumer;
+ }
+
+ public static MessageProducer createQueueProducer(
+ Session session,
+ String destinationName) throws Exception {
+ return createMessageProducer(session, destinationName, false, true, -1);
+ }
+
+ public static MessageProducer createTopicProducer(
+ Session session,
+ String destinationName) throws Exception {
+ return createMessageProducer(session, destinationName, true, false, -1);
+ }
+
+ public static MessageProducer createMessageProducer(
+ Session session,
+ String destinationName,
+ boolean topic,
+ boolean persitent,
+ long ttl) throws Exception {
+ MessageProducer messageProducer = null;
+ Destination destination = null;
+ if (topic) {
+ if (destinationName.startsWith("topic://")) {
+ destinationName = destinationName.substring("topic://".length());
+ }
+ destination = session.createTopic(destinationName);
+ } else {
+ if (destinationName.startsWith("queue://")) {
+ destinationName = destinationName.substring("queue://".length());
+ }
+ destination = session.createQueue(destinationName);
+ }
+ messageProducer = session.createProducer(destination);
+
+ if (persitent) {
+ messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ } else {
+ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+ if (ttl > 0) {
+ messageProducer.setTimeToLive(ttl);
+ }
+ return messageProducer;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import javax.jms.Session;
+
+/**
+ * Session acknowledge enum keys
+ *
+ */
+public enum SessionAcknowledgementType {
+ AUTO_ACKNOWLEDGE(Session.AUTO_ACKNOWLEDGE),
+ CLIENT_ACKNOWLEDGE(Session.CLIENT_ACKNOWLEDGE),
+ DUPS_OK_ACKNOWLEDGE(Session.DUPS_OK_ACKNOWLEDGE),
+ SESSION_TRANSACTED(Session.SESSION_TRANSACTED);
+
+ private int intValue = -1;
+
+ private SessionAcknowledgementType(int intValue) {
+ this.intValue = intValue;
+ }
+
+ public int intValue() {
+ return this.intValue;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.component.sjms.ConnectionResource;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * TODO Add Class documentation for DefaultConnectionResource
+ *
+ */
+public class DefaultConnectionResource extends ObjectPool<Connection> implements ConnectionResource {
+ private ConnectionFactory connectionFactory;
+ private String username;
+ private String password;
+ private String connectionId;
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ */
+ public DefaultConnectionResource() {
+ super();
+ }
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param poolSize
+ * @param connectionFactory
+ */
+ public DefaultConnectionResource(int poolSize, ConnectionFactory connectionFactory) {
+ this(poolSize, connectionFactory, null, null);
+ }
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param poolSize
+ * @param connectionFactory
+ * @param username
+ * @param password
+ */
+ public DefaultConnectionResource(int poolSize, ConnectionFactory connectionFactory, String username, String password) {
+ super(poolSize);
+ this.connectionFactory = connectionFactory;
+ this.username = username;
+ this.password = password;
+ }
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param poolSize
+ * @param connectionFactory
+ * @param username
+ * @param password
+ */
+ public DefaultConnectionResource(int poolSize, ConnectionFactory connectionFactory, String username, String password, String connectionId) {
+ super(poolSize);
+ this.connectionFactory = connectionFactory;
+ this.username = username;
+ this.password = password;
+ this.connectionId = connectionId;
+ }
+
+ @Override
+ public Connection borrowConnection() throws Exception {
+ return this.borrowObject();
+ }
+
+ @Override
+ public Connection borrowConnection(long timeout) throws Exception {
+ return this.borrowObject(timeout);
+ }
+
+ @Override
+ public void returnConnection(Connection connection) throws Exception {
+ returnObject(connection);
+ }
+
+ @Override
+ protected Connection createObject() throws Exception {
+ Connection connection = null;
+ if (connectionFactory != null) {
+ if (getUsername() != null && getPassword() != null) {
+ connection = connectionFactory.createConnection(getUsername(), getPassword());
+ } else {
+ connection = connectionFactory.createConnection();
+ }
+ }
+ if (connection != null) {
+ if (ObjectHelper.isNotEmpty(getConnectionId())) {
+ connection.setClientID(getConnectionId());
+ }
+ connection.start();
+ }
+ return connection;
+ }
+
+ @Override
+ protected void destroyObject(Connection connection) throws Exception {
+ if (connection != null) {
+ connection.stop();
+ connection.close();
+ }
+
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(String connectionId) {
+ this.connectionId = connectionId;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for ObjectPool
+ *
+ */
+public abstract class ObjectPool<T> {
+
+ private static final int DEFAULT_POOL_SIZE = 1;
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+ private BlockingQueue<T> objects;
+ private int maxSize = DEFAULT_POOL_SIZE;
+ private AtomicInteger poolCount = new AtomicInteger();
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public ObjectPool() {
+ this(DEFAULT_POOL_SIZE);
+ }
+
+ public ObjectPool(int poolSize) {
+ this.maxSize = poolSize;
+ }
+
+ public void fillPool() {
+ objects = new ArrayBlockingQueue<T>(getMaxSize(), false);
+ for (int i = 0; i < maxSize; i++) {
+ try {
+ T t = createObject();
+ objects.add(t);
+ poolCount.incrementAndGet();
+ } catch (Exception e) {
+ logger.error("Unable to create Object and add it to the pool. Reason: "
+ + e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ public void drainPool() throws Exception {
+ getLock().writeLock().lock();
+ try {
+ while (!objects.isEmpty()) {
+ T t = objects.remove();
+ destroyObject(t);
+ }
+ } finally {
+ getLock().writeLock().unlock();
+ }
+ }
+
+ /**
+ * Implement to create new objects of type T when the pool is initialized
+ * empty.
+ *
+ * @return
+ * @throws Exception
+ */
+ protected abstract T createObject() throws Exception;
+
+ /**
+ * Clean up pool objects
+ *
+ * @return
+ * @throws Exception
+ */
+ protected abstract void destroyObject(T t) throws Exception;
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ public T borrowObject() throws Exception {
+ return borrowObject(1000);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ public T borrowObject(long timeout) throws Exception {
+ T t = null;
+ getLock().writeLock().lock();
+ try {
+ t = objects.poll(timeout, TimeUnit.MILLISECONDS);
+ } finally {
+ getLock().writeLock().unlock();
+ }
+ return t;
+ }
+
+ /**
+ * @param object
+ * @throws Exception
+ */
+ public void returnObject(T object) throws Exception {
+ this.objects.add(object);
+ }
+
+ /**
+ * @return
+ */
+ int size() {
+ return objects.size();
+ }
+
+ /**
+ * Gets the ReadWriteLock value of lock for this instance of ObjectPool.
+ *
+ * @return the lock
+ */
+ protected ReadWriteLock getLock() {
+ return lock;
+ }
+
+ /**
+ * Gets the int value of maxSize for this instance of ObjectPool.
+ *
+ * @return the maxSize
+ */
+ public int getMaxSize() {
+ return maxSize;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+
+import org.apache.camel.component.sjms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
+
+/**
+ * TODO Add Class documentation for SessionPool
+ *
+ */
+public class SessionPool extends ObjectPool<Session> {
+
+ private ConnectionResource connectionResource;
+ private boolean transacted;
+ private SessionAcknowledgementType acknowledgeMode = SessionAcknowledgementType.AUTO_ACKNOWLEDGE;
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ */
+ public SessionPool(int poolSize, ConnectionResource connectionResource) {
+ super(poolSize);
+ this.connectionResource = connectionResource;
+ }
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param poolSize
+ */
+ public SessionPool(int poolSize) {
+ super(poolSize);
+ }
+
+ @Override
+ protected Session createObject() throws Exception {
+ Session session = null;
+ final Connection connection = getConnectionResource().borrowConnection(5000);
+ if (connection != null) {
+ if (transacted) {
+ session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ } else {
+ switch (acknowledgeMode) {
+ case CLIENT_ACKNOWLEDGE:
+ session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE);
+ break;
+ case DUPS_OK_ACKNOWLEDGE:
+ session = connection.createSession(transacted, Session.DUPS_OK_ACKNOWLEDGE);
+ break;
+ case AUTO_ACKNOWLEDGE:
+ session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ break;
+ default:
+ // do nothing here.
+ }
+ }
+ }
+ getConnectionResource().returnConnection(connection);
+ return session;
+ }
+
+ @Override
+ protected void destroyObject(Session session) throws Exception {
+ // lets reset the session
+ session.setMessageListener(null);
+
+ if (transacted) {
+ try {
+ session.rollback();
+ } catch (JMSException e) {
+ logger.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
+ }
+ }
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ }
+
+ /**
+ * Gets the SessionAcknowledgementType value of acknowledgeMode for this instance of SessionPool.
+ *
+ * @return the DEFAULT_ACKNOWLEDGE_MODE
+ */
+ public final SessionAcknowledgementType getAcknowledgeMode() {
+ return acknowledgeMode;
+ }
+
+ /**
+ * Sets the SessionAcknowledgementType value of acknowledgeMode for this instance of SessionPool.
+ *
+ * @param acknowledgeMode Sets SessionAcknowledgementType, default is AUTO_ACKNOWLEDGE
+ */
+ public final void setAcknowledgeMode(SessionAcknowledgementType acknowledgeMode) {
+ this.acknowledgeMode = acknowledgeMode;
+ }
+
+ /**
+ * Gets the boolean value of transacted for this instance of SessionPool.
+ *
+ * @return the transacted
+ */
+ public final boolean isTransacted() {
+ return transacted;
+ }
+
+ /**
+ * Sets the boolean value of transacted for this instance of SessionPool.
+ *
+ * @param transacted Sets boolean, default is TODO add default
+ */
+ public final void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
+ /**
+ * Gets the DefaultConnectionResource value of connectionResource for this instance of SessionPool.
+ *
+ * @return the connectionResource
+ */
+ public ConnectionResource getConnectionResource() {
+ return connectionResource;
+ }
+
+ protected XAResource createXaResource(XASession session) throws JMSException {
+ return session.getXAResource();
+ }
+
+
+// protected class Synchronization implements javax.transaction.Synchronization {
+// private final XASession session;
+//
+// private Synchronization(XASession session) {
+// this.session = session;
+// }
+//
+// public void beforeCompletion() {
+// }
+//
+// public void afterCompletion(int status) {
+// try {
+// // This will return session to the pool.
+// session.setIgnoreClose(false);
+// session.close();
+// session.setIsXa(false);
+// } catch (JMSException e) {
+// throw new RuntimeException(e);
+// }
+// }
+// }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
+
+/**
+ * The InOnlyProducer is responsible for publishing messages to the JMS
+ * {@link Destination} for the value specified in the destinationName field.
+ */
+public class InOnlyProducer extends SjmsProducer {
+
+ public InOnlyProducer(SjmsEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ /*
+ * @see org.apache.camel.component.sjms.SjmsProducer#doCreateProducerModel()
+ *
+ * @return
+ * @throws Exception
+ */
+ public MessageProducerResources doCreateProducerModel() throws Exception {
+ Connection conn = getConnectionResource().borrowConnection();
+ Session session = null;
+ if (isEndpointTransacted()) {
+ session = conn.createSession(true, getAcknowledgeMode());
+ } else {
+ session = conn.createSession(false, getAcknowledgeMode());
+ }
+ MessageProducer messageProducer = null;
+ if (isTopic()) {
+ messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
+ } else {
+ messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+ }
+ getConnectionResource().returnConnection(conn);
+ return new MessageProducerResources(session, messageProducer);
+ }
+
+ @Override
+ public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
+ if (getProducers() != null) {
+ MessageProducerResources producer = getProducers().borrowObject();
+
+ if (isEndpointTransacted()) {
+ exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+ }
+
+ Message message = JmsMessageHelper.createMessage(exchange, producer.getSession());
+ producer.getMessageProducer().send(message);
+ getProducers().returnObject(producer);
+ callback.done(isSynchronous());
+ }
+ }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.pool.ObjectPool;
+import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
+import org.apache.camel.util.ObjectHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for InOutProducer
+ *
+ */
+public class InOutProducer extends SjmsProducer {
+
+ /**
+ * We use the {@link ReadWriteLock} to manage the {@link TreeMap} in place
+ * of a {@link ConcurrentMap} because due to significant performance gains.
+ *
+ * TODO Externalize the Exchanger Map to a store object
+ */
+ private static Map<String, Exchanger<Object>> exchangerMap = new TreeMap<String, Exchanger<Object>>();
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+
+ /**
+ * A pool of {@link MessageConsumerResource} objects that are the
+ * reply consumers.
+ *
+ * TODO Add Class documentation for MessageProducerPool
+ * TODO Externalize
+ *
+ */
+ protected class MessageConsumerPool extends ObjectPool<MessageConsumerResource> {
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param poolSize
+ */
+ public MessageConsumerPool(int poolSize) {
+ super(poolSize);
+ }
+
+ @Override
+ protected MessageConsumerResource createObject() throws Exception {
+ Connection conn = getConnectionResource().borrowConnection();
+ Session session = null;
+ if (isEndpointTransacted()) {
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } else {
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ getConnectionResource().returnConnection(conn);
+ Destination replyToDestination = null;
+ if (ObjectHelper.isEmpty(getNamedReplyTo())) {
+ replyToDestination = JmsObjectFactory.createTemporaryDestination(session, isTopic());
+ } else {
+ replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(), isTopic());
+ }
+ MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, replyToDestination, null, isTopic(), null, true);
+ messageConsumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ logger.info("Message Received in the Consumer Pool");
+ logger.info(" Message : {}", message);
+ try {
+ Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
+ exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ }
+ });
+ MessageConsumerResource mcm = new MessageConsumerResource(session, messageConsumer, replyToDestination);
+ return mcm;
+ }
+
+ @Override
+ protected void destroyObject(MessageConsumerResource model) throws Exception {
+ if (model.getMessageConsumer() != null) {
+ model.getMessageConsumer().close();
+ }
+
+ if (model.getSession() != null) {
+ if (model.getSession().getTransacted()) {
+ try {
+ model.getSession().rollback();
+ } catch (Exception e) {
+ // Do nothing. Just make sure we are cleaned up
+ }
+ }
+ model.getSession().close();
+ }
+ }
+ }
+
+ /**
+ * TODO Add Class documentation for MessageConsumerResource
+ */
+ protected class MessageConsumerResource {
+ private final Session session;
+ private final MessageConsumer messageConsumer;
+ private final Destination replyToDestination;
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param session
+ * @param messageConsumer
+ */
+ public MessageConsumerResource(Session session, MessageConsumer messageConsumer, Destination replyToDestination) {
+ super();
+ this.session = session;
+ this.messageConsumer = messageConsumer;
+ this.replyToDestination = replyToDestination;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ public MessageConsumer getMessageConsumer() {
+ return messageConsumer;
+ }
+
+ public Destination getReplyToDestination() {
+ return replyToDestination;
+ }
+ }
+
+ protected class InOutResponseContainer {
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ /**
+ *
+ * @param exchange
+ * @param callback
+ */
+ public InOutResponseContainer(Exchange exchange, AsyncCallback callback) {
+ super();
+ this.exchange = exchange;
+ this.callback = callback;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public AsyncCallback getCallback() {
+ return callback;
+ }
+ }
+
+
+ protected class InternalTempDestinationListener implements MessageListener {
+ private final Logger tempLogger = LoggerFactory.getLogger(InternalTempDestinationListener.class);
+ private Exchanger<Object> exchanger;
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param exchanger
+ */
+ public InternalTempDestinationListener(Exchanger<Object> exchanger) {
+ super();
+ this.exchanger = exchanger;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ if (tempLogger.isDebugEnabled()) {
+ tempLogger.debug("Message Received in the Consumer Pool");
+ tempLogger.debug(" Message : {}", message);
+ }
+ try {
+ exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ }
+ }
+
+ private MessageConsumerPool consumers;
+
+ public InOutProducer(SjmsEndpoint endpoint) {
+ super(endpoint);
+ endpoint.getConsumerCount();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (ObjectHelper.isEmpty(getNamedReplyTo())) {
+ if (log.isDebugEnabled()) {
+ log.debug("No reply to destination is defined. Using temporary destinations.");
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Using {} as the reply to destination.", getNamedReplyTo());
+ }
+ }
+ if (getConsumers() == null) {
+ setConsumers(new MessageConsumerPool(getConsumerCount()));
+ getConsumers().fillPool();
+ }
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (getConsumers() != null) {
+ getConsumers().drainPool();
+ setConsumers(null);
+ }
+ }
+
+ @Override
+ public MessageProducerResources doCreateProducerModel() throws Exception {
+ Connection conn = getConnectionResource().borrowConnection();
+ Session session = null;
+ if (isEndpointTransacted()) {
+ session = conn.createSession(true, getAcknowledgeMode());
+ } else {
+ session = conn.createSession(false, getAcknowledgeMode());
+ }
+ MessageProducer messageProducer = null;
+ if (isTopic()) {
+ messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
+ } else {
+ messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+ }
+ getConnectionResource().returnConnection(conn);
+ return new MessageProducerResources(session, messageProducer);
+ }
+
+ /**
+ * TODO Add override javadoc
+ * TODO time out is actually double as it waits for the producer and then waits for the response. Use an atomiclong to manage the countdown
+ *
+ * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
+ *
+ * @param exchange
+ * @param callback
+ * @throws Exception
+ */
+ @Override
+ public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
+ if (getProducers() != null) {
+ MessageProducerResources producer = null;
+ try {
+ producer = getProducers().borrowObject(getResponseTimeOut());
+ // producer = getProducers().borrowObject();
+ } catch (Exception e1) {
+ log.warn("The producer pool is exhausted. Consider setting producerCount to a higher value or disable the fixed size of the pool by setting fixedResourcePool=false.");
+ exchange.setException(new Exception("Producer Resource Pool is exhausted"));
+ }
+ if (producer != null) {
+
+ if (isEndpointTransacted()) {
+ exchange.getUnitOfWork()
+ .addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+ }
+
+ Message request = JmsMessageHelper.createMessage(exchange, producer.getSession());
+ // TODO just set the correlation id don't get it from the
+ // message
+ String correlationId = null;
+ if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) {
+ correlationId = UUID.randomUUID().toString().replace("-", "");
+ } else {
+ correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+ }
+ Object responseObject = null;
+ Exchanger<Object> messageExchanger = new Exchanger<Object>();
+ JmsMessageHelper.setCorrelationId(request, correlationId);
+ try {
+ lock.writeLock().lock();
+ exchangerMap.put(correlationId, messageExchanger);
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
+ JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+ consumers.returnObject(consumer);
+ producer.getMessageProducer().send(request);
+
+ try {
+ getProducers().returnObject(producer);
+ } catch (Exception exception) {
+ // thrown if the pool is full. safe to ignore.
+ }
+
+ try {
+ responseObject = messageExchanger.exchange(null, getResponseTimeOut(),
+ TimeUnit.MILLISECONDS);
+
+ try {
+ lock.writeLock().lock();
+ exchangerMap.remove(correlationId);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.debug("Exchanger was interrupted while waiting on response", e);
+ exchange.setException(e);
+ } catch (TimeoutException e) {
+ log.debug("Exchanger timed out while waiting on response", e);
+ exchange.setException(e);
+ }
+
+ if (exchange.getException() == null) {
+ if (responseObject instanceof Throwable) {
+ exchange.setException((Throwable)responseObject);
+ } else if (responseObject instanceof Message) {
+ Message response = (Message)responseObject;
+ JmsMessageHelper.populateExchange(response, exchange, true);
+ } else {
+ exchange.setException(new CamelException("Unknown response type: " + responseObject));
+ }
+ }
+ }
+
+ callback.done(isSynchronous());
+ }
+ }
+
+ public void setConsumers(MessageConsumerPool consumers) {
+ this.consumers = consumers;
+ }
+
+ public MessageConsumerPool getConsumers() {
+ return consumers;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for SessionTransactionSynchronization
+ *
+ */
+public class SessionTransactionSynchronization implements Synchronization {
+ private Logger log = LoggerFactory.getLogger(getClass());
+ private Session session;
+
+ public SessionTransactionSynchronization(Session session) {
+ this.session = session;
+ }
+
+ /*
+ * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
+ *
+ * @param exchange
+ */
+ @Override
+ public void onFailure(Exchange exchange) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+ }
+ try {
+ if (session != null && session.getTransacted()) {
+ this.session.rollback();
+ }
+ } catch (JMSException e) {
+ log.warn("Failed to rollback the session: {}", e.getMessage());
+ }
+ }
+
+ /*
+ * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange)
+ *
+ * @param exchange
+ */
+ @Override
+ public void onComplete(Exchange exchange) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+ }
+ try {
+ if (session != null && session.getTransacted()) {
+ this.session.commit();
+ }
+ } catch (JMSException e) {
+ log.warn("Failed to commit the session: {}", e.getMessage());
+ exchange.setException(e);
+ }
+ }
+
+}