You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/01/10 11:51:34 UTC
[1/2] git commit: CAMEL-7107 avoid the NPE in case of connection loss
with thanks to Marios
Updated Branches:
refs/heads/camel-2.12.x 3ad782cfe -> 3cc1f5c65
CAMEL-7107 avoid the NPE in case of connection loss with thanks to Marios
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cc1f5c6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cc1f5c6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cc1f5c6
Branch: refs/heads/camel-2.12.x
Commit: 3cc1f5c6564735d71d5da8e702e13cc84e10f81d
Parents: cef0dd0
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Jan 10 17:57:07 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Jan 10 18:51:18 2014 +0800
----------------------------------------------------------------------
.../component/sjms/producer/InOnlyProducer.java | 35 +++++++++++---------
1 file changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3cc1f5c6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index df689b2..84eb1f5 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.sjms.producer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import javax.jms.Connection;
@@ -25,9 +26,9 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.BatchMessage;
-import org.apache.camel.component.sjms.SjmsEndpoint;
import org.apache.camel.component.sjms.SjmsProducer;
import org.apache.camel.component.sjms.TransactionCommitStrategy;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
@@ -40,7 +41,7 @@ import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
*/
public class InOnlyProducer extends SjmsProducer {
- public InOnlyProducer(SjmsEndpoint endpoint) {
+ public InOnlyProducer(final Endpoint endpoint) {
super(endpoint);
}
@@ -55,11 +56,9 @@ public class InOnlyProducer extends SjmsProducer {
Connection conn = null;
try {
conn = getConnectionResource().borrowConnection();
-
TransactionCommitStrategy commitStrategy = null;
- Session session = null;
- MessageProducer messageProducer = null;
-
+ Session session;
+
if (isEndpointTransacted()) {
if (getCommitStrategy() != null) {
commitStrategy = getCommitStrategy();
@@ -70,6 +69,8 @@ public class InOnlyProducer extends SjmsProducer {
} else {
session = conn.createSession(false, getAcknowledgeMode());
}
+
+ MessageProducer messageProducer;
if (isTopic()) {
messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
} else {
@@ -94,16 +95,16 @@ public class InOnlyProducer extends SjmsProducer {
* @throws Exception
*/
@Override
- public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
- List<Message> messages = new ArrayList<Message>();
+ public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
+ Collection<Message> messages = new ArrayList<Message>(1);
MessageProducerResources producer = getProducers().borrowObject();
try {
- if (getProducers() != null) {
+ if (producer != null) {
if (exchange.getIn().getBody() != null) {
if (exchange.getIn().getBody() instanceof List) {
- List<?> payload = (List<?>)exchange.getIn().getBody();
- for (Object object : payload) {
- Message message = null;
+ Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody();
+ for (final Object object : payload) {
+ Message message;
if (BatchMessage.class.isInstance(object)) {
BatchMessage<?> batchMessage = (BatchMessage<?>)object;
message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
@@ -124,14 +125,18 @@ public class InOnlyProducer extends SjmsProducer {
if (isEndpointTransacted()) {
exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
}
- for (Message message : messages) {
+ for (final Message message : messages) {
producer.getMessageProducer().send(message);
}
+ } else {
+ exchange.setException(new Exception("Unable to send message: connection not available"));
}
} catch (Exception e) {
- exchange.setException(new Exception("Unable to complet sending the message: " + e.getLocalizedMessage()));
+ exchange.setException(new Exception("Unable to complete sending the message: " + e.getLocalizedMessage()));
} finally {
- getProducers().returnObject(producer);
+ if (producer != null) {
+ getProducers().returnObject(producer);
+ }
callback.done(isSynchronous());
}
}
[2/2] git commit: CAMEL-6362: Consumers should always use dedicated
Sessions
Posted by ni...@apache.org.
CAMEL-6362: Consumers should always use dedicated Sessions
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cef0dd05
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cef0dd05
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cef0dd05
Branch: refs/heads/camel-2.12.x
Commit: cef0dd05fd66f9951771b307d17476a24861c2c0
Parents: 3ad782c
Author: Scott England-Sullivan <su...@apache.org>
Authored: Tue Oct 29 11:17:35 2013 -0500
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Jan 10 18:51:18 2014 +0800
----------------------------------------------------------------------
.../camel/component/sjms/SjmsConsumer.java | 75 +++++------
.../camel/component/sjms/SjmsEndpoint.java | 42 +-----
.../camel/component/sjms/SjmsProducer.java | 5 +-
.../component/sjms/producer/InOnlyProducer.java | 102 +++++++++------
.../component/sjms/producer/InOutProducer.java | 127 ++++++++++++-------
.../InOnlyTopicDurableConsumerTest.java | 4 +-
6 files changed, 180 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cef0dd05/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 806537d..5b01c3a 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
+import org.apache.camel.CamelException;
import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
@@ -32,7 +33,6 @@ import org.apache.camel.component.sjms.consumer.InOutMessageHandler;
import org.apache.camel.component.sjms.jms.ConnectionResource;
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
import org.apache.camel.component.sjms.jms.ObjectPool;
-import org.apache.camel.component.sjms.jms.SessionPool;
import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy;
import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
@@ -66,12 +66,7 @@ public class SjmsConsumer extends DefaultConsumer {
*/
@Override
protected MessageConsumerResources createObject() throws Exception {
- MessageConsumerResources model = null;
- if (isTransacted() || getEndpoint().getExchangePattern().equals(ExchangePattern.InOut)) {
- model = createConsumerWithDedicatedSession();
- } else {
- model = createConsumerListener();
- }
+ MessageConsumerResources model = createConsumer();
return model;
}
@@ -168,39 +163,41 @@ public class SjmsConsumer extends DefaultConsumer {
* Creates a {@link MessageConsumerResources} with a dedicated
* {@link Session} required for transacted and InOut consumers.
*/
- private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception {
- Connection conn = getConnectionResource().borrowConnection();
- Session session = null;
- if (isTransacted()) {
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- } else {
- session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ @SuppressWarnings("unused")
+ private MessageConsumerResources createConsumer() throws Exception {
+ MessageConsumerResources answer = null;
+ Connection conn = null;
+ try {
+ conn = getConnectionResource().borrowConnection();
+
+ Session session = null;
+ MessageConsumer messageConsumer = null;
+ if (isTransacted()) {
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } else {
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ messageConsumer = JmsObjectFactory.createMessageConsumer(session, getDestinationName(), getMessageSelector(), isTopic(), getDurableSubscriptionId());
+ MessageListener handler = createMessageHandler(session);
+ messageConsumer.setMessageListener(handler);
+
+ if (session == null) {
+ throw new CamelException("Message Consumer Creation Exception: Session is NULL");
+ }
+ if (messageConsumer == null) {
+ throw new CamelException("Message Consumer Creation Exception: MessageConsumer is NULL");
+ }
+ answer = new MessageConsumerResources(session, messageConsumer);
+ } catch (Exception e) {
+ log.error("Unable to create the MessageConsumer: " + e.getLocalizedMessage());
+ } finally {
+ if (conn != null) {
+ getConnectionResource().returnConnection(conn);
+ }
}
- MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, getDestinationName(), getMessageSelector(), isTopic(), getDurableSubscriptionId());
- MessageListener handler = createMessageHandler(session);
- messageConsumer.setMessageListener(handler);
- getConnectionResource().returnConnection(conn);
- return new MessageConsumerResources(session, messageConsumer);
+ return answer;
}
- /**
- * Creates a {@link MessageConsumerResources} with a shared {@link Session}
- * for non-transacted InOnly consumers.
- */
- private MessageConsumerResources createConsumerListener() throws Exception {
- Session queueSession = getSessionPool().borrowObject();
- MessageConsumer messageConsumer = null;
- if (isTopic()) {
- messageConsumer = JmsObjectFactory.createTopicConsumer(queueSession, getDestinationName(), getMessageSelector());
- } else {
- messageConsumer = JmsObjectFactory.createQueueConsumer(queueSession, getDestinationName(), getMessageSelector());
- }
- getSessionPool().returnObject(queueSession);
- // Don't pass in the session. Only needed if we are transacted
- MessageListener handler = createMessageHandler(null);
- messageConsumer.setMessageListener(handler);
- return new MessageConsumerResources(messageConsumer);
- }
/**
* Helper factory method used to create a MessageListener based on the MEP
@@ -253,10 +250,6 @@ public class SjmsConsumer extends DefaultConsumer {
return getEndpoint().getConnectionResource();
}
- protected SessionPool getSessionPool() {
- return getEndpoint().getSessions();
- }
-
public int getAcknowledgementMode() {
return getEndpoint().getAcknowledgementMode().intValue();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cef0dd05/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index edaa70c..63118ec 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -25,7 +25,6 @@ import org.apache.camel.Producer;
import org.apache.camel.component.sjms.jms.ConnectionResource;
import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
-import org.apache.camel.component.sjms.jms.SessionPool;
import org.apache.camel.component.sjms.producer.InOnlyProducer;
import org.apache.camel.component.sjms.producer.InOutProducer;
import org.apache.camel.impl.DefaultEndpoint;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
protected final Logger logger = LoggerFactory.getLogger(getClass());
- private SessionPool sessions;
+
@UriParam
private boolean synchronous = true;
@UriParam
@@ -95,28 +94,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
@Override
protected void doStart() throws Exception {
super.doStart();
-
- //
- // TODO since we only need a session pool for one use case, find a
- // better way
- //
- // We only create a session pool when we are not transacted.
- // Transacted listeners or producers need to be paired with the
- // Session that created them.
- if (!isTransacted() && getExchangePattern().equals(ExchangePattern.InOnly)) {
- sessions = new SessionPool(getSessionCount(), getConnectionResource());
-
- // TODO fix the string hack
- sessions.setAcknowledgeMode(SessionAcknowledgementType.valueOf(getAcknowledgementMode() + ""));
- getSessions().fillPool();
- }
}
@Override
protected void doStop() throws Exception {
- if (getSessions() != null) {
- getSessions().drainPool();
- }
super.doStop();
}
@@ -169,25 +150,6 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
}
/**
- * Returns a SessionPool if available.
- *
- * @return the sessions
- */
- public SessionPool getSessions() {
- return sessions;
- }
-
- /**
- * SessionPool used by endpoints that do not require a dedicated session per
- * consumer or producer.
- *
- * @param sessions default null
- */
- public void setSessions(SessionPool sessions) {
- this.sessions = sessions;
- }
-
- /**
* Use to determine whether or not to process exchanges synchronously.
*
* @return true if endoint is synchronous, otherwise false
@@ -239,6 +201,7 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
*
* @return the sessionCount
*/
+ @Deprecated
public int getSessionCount() {
return sessionCount;
}
@@ -250,6 +213,7 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
*
* @param sessionCount the number of Session instances, default is 1
*/
+ @Deprecated
public void setSessionCount(int sessionCount) {
this.sessionCount = sessionCount;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cef0dd05/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index d4e1a1d..c5d9c1f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -157,7 +157,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
if (log.isDebugEnabled()) {
log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
}
-
+
try {
if (!isSynchronous()) {
if (log.isDebugEnabled()) {
@@ -189,10 +189,9 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
exchange.setException(e);
}
log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
-
+
return isSynchronous();
}
-
protected SjmsEndpoint getSjmsEndpoint() {
return (SjmsEndpoint)this.getEndpoint();
http://git-wip-us.apache.org/repos/asf/camel/blob/cef0dd05/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index ca49765..df689b2 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -51,32 +51,44 @@ public class InOnlyProducer extends SjmsProducer {
*/
@Override
public MessageProducerResources doCreateProducerModel() throws Exception {
- Connection conn = getConnectionResource().borrowConnection();
- TransactionCommitStrategy commitStrategy = null;
- Session session = null;
- if (isEndpointTransacted()) {
- if (getCommitStrategy() != null) {
- commitStrategy = getCommitStrategy();
+ MessageProducerResources answer = null;
+ Connection conn = null;
+ try {
+ conn = getConnectionResource().borrowConnection();
+
+ TransactionCommitStrategy commitStrategy = null;
+ Session session = null;
+ MessageProducer messageProducer = null;
+
+ if (isEndpointTransacted()) {
+ if (getCommitStrategy() != null) {
+ commitStrategy = getCommitStrategy();
+ } else {
+ commitStrategy = new DefaultTransactionCommitStrategy();
+ }
+ session = conn.createSession(true, getAcknowledgeMode());
} else {
- commitStrategy = new DefaultTransactionCommitStrategy();
+ session = conn.createSession(false, getAcknowledgeMode());
+ }
+ if (isTopic()) {
+ messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
+ } else {
+ messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+ }
+ answer = new MessageProducerResources(session, messageProducer, commitStrategy);
+ } catch (Exception e) {
+ log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage());
+ } finally {
+ if (conn != null) {
+ getConnectionResource().returnConnection(conn);
}
- session = conn.createSession(true, getAcknowledgeMode());
- } else {
- session = conn.createSession(false, getAcknowledgeMode());
- }
- MessageProducer messageProducer = null;
- if (isTopic()) {
- messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
- } else {
- messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
}
- getConnectionResource().returnConnection(conn);
- return new MessageProducerResources(session, messageProducer, commitStrategy);
+ return answer;
}
/*
- * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
- *
+ * @see
+ * org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
* @param exchange
* @param callback
* @throws Exception
@@ -85,34 +97,40 @@ public class InOnlyProducer extends SjmsProducer {
public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
List<Message> messages = new ArrayList<Message>();
MessageProducerResources producer = getProducers().borrowObject();
- if (getProducers() != null) {
- if (exchange.getIn().getBody() != null) {
- if (exchange.getIn().getBody() instanceof List) {
- List<?> payload = (List<?>)exchange.getIn().getBody();
- for (Object object : payload) {
- Message message = null;
- if (BatchMessage.class.isInstance(object)) {
- BatchMessage<?> batchMessage = (BatchMessage<?>)object;
- message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
- .getJmsKeyFormatStrategy());
- } else {
- message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+ try {
+ if (getProducers() != null) {
+ if (exchange.getIn().getBody() != null) {
+ if (exchange.getIn().getBody() instanceof List) {
+ List<?> payload = (List<?>)exchange.getIn().getBody();
+ for (Object object : payload) {
+ Message message = null;
+ if (BatchMessage.class.isInstance(object)) {
+ BatchMessage<?> batchMessage = (BatchMessage<?>)object;
+ message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
+ .getJmsKeyFormatStrategy());
+ } else {
+ message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+ }
+ messages.add(message);
}
+ } else {
+ Object payload = exchange.getIn().getBody();
+ Message message = JmsMessageHelper
+ .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
messages.add(message);
}
- } else {
- Object payload = exchange.getIn().getBody();
- Message message = JmsMessageHelper.createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
- messages.add(message);
}
- }
- if (isEndpointTransacted()) {
- exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
- }
- for (Message message : messages) {
- producer.getMessageProducer().send(message);
+ if (isEndpointTransacted()) {
+ exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
+ }
+ for (Message message : messages) {
+ producer.getMessageProducer().send(message);
+ }
}
+ } catch (Exception e) {
+ exchange.setException(new Exception("Unable to complet sending the message: " + e.getLocalizedMessage()));
+ } finally {
getProducers().returnObject(producer);
callback.done(isSynchronous());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cef0dd05/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index c98e136..0936ecf 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -79,40 +79,49 @@ public class InOutProducer extends SjmsProducer {
@Override
protected MessageConsumerResource createObject() throws Exception {
- Connection conn = getConnectionResource().borrowConnection();
+ MessageConsumerResource answer = null;
+ Connection conn = null;
Session session = null;
- if (isEndpointTransacted()) {
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- } else {
- session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- getConnectionResource().returnConnection(conn);
- Destination replyToDestination = null;
- if (ObjectHelper.isEmpty(getNamedReplyTo())) {
- replyToDestination = JmsObjectFactory.createTemporaryDestination(session, isTopic());
- } else {
- replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(), isTopic());
- }
- MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, replyToDestination, null, isTopic(), null, true);
- messageConsumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- if (logger.isDebugEnabled()) {
- logger.debug("Message Received in the Consumer Pool");
- logger.debug(" Message : {}", message);
- }
- try {
- Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
- exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- ObjectHelper.wrapRuntimeCamelException(e);
- }
+ try {
+ conn = getConnectionResource().borrowConnection();
+ if (isEndpointTransacted()) {
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } else {
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ Destination replyToDestination = null;
+ if (ObjectHelper.isEmpty(getNamedReplyTo())) {
+ replyToDestination = JmsObjectFactory.createTemporaryDestination(session, isTopic());
+ } else {
+ replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(), isTopic());
}
- });
- MessageConsumerResource mcm = new MessageConsumerResource(session, messageConsumer, replyToDestination);
- return mcm;
+ MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, replyToDestination, null, isTopic(), null, true);
+ messageConsumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message Received in the Consumer Pool");
+ logger.debug(" Message : {}", message);
+ }
+ try {
+ Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
+ exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ }
+ });
+ answer = new MessageConsumerResource(session, messageConsumer, replyToDestination);
+ } catch (Exception e) {
+ log.error("Unable to create the MessageConsumerResource: " + e.getLocalizedMessage());
+ throw new CamelException(e);
+ } finally {
+ getConnectionResource().returnConnection(conn);
+ }
+ return answer;
}
@Override
@@ -227,21 +236,42 @@ public class InOutProducer extends SjmsProducer {
@Override
public MessageProducerResources doCreateProducerModel() throws Exception {
- Connection conn = getConnectionResource().borrowConnection();
- Session session = null;
- if (isEndpointTransacted()) {
- session = conn.createSession(true, getAcknowledgeMode());
- } else {
- session = conn.createSession(false, getAcknowledgeMode());
- }
- MessageProducer messageProducer = null;
- if (isTopic()) {
- messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
- } else {
- messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+ MessageProducerResources answer = null;
+ Connection conn = null;
+ try {
+ MessageProducer messageProducer = null;
+ Session session = null;
+
+ conn = getConnectionResource().borrowConnection();
+ if (isEndpointTransacted()) {
+ session = conn.createSession(true, getAcknowledgeMode());
+ } else {
+ session = conn.createSession(false, getAcknowledgeMode());
+ }
+ if (isTopic()) {
+ messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
+ } else {
+ messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+ }
+
+ if (session == null) {
+ throw new CamelException("Message Consumer Creation Exception: Session is NULL");
+ }
+ if (messageProducer == null) {
+ throw new CamelException("Message Consumer Creation Exception: MessageProducer is NULL");
+ }
+
+ answer = new MessageProducerResources(session, messageProducer);
+
+ } catch (Exception e) {
+ log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage());
+ } finally {
+ if (conn != null) {
+ getConnectionResource().returnConnection(conn);
+ }
}
- getConnectionResource().returnConnection(conn);
- return new MessageProducerResources(session, messageProducer);
+
+ return answer;
}
/**
@@ -269,9 +299,9 @@ public class InOutProducer extends SjmsProducer {
if (isEndpointTransacted()) {
exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
}
-
+
Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy());
-
+
// TODO just set the correlation id don't get it from the
// message
String correlationId = null;
@@ -295,7 +325,8 @@ public class InOutProducer extends SjmsProducer {
consumers.returnObject(consumer);
producer.getMessageProducer().send(request);
- // Return the producer to the pool so another waiting producer can move forward
+ // Return the producer to the pool so another waiting producer
+ // can move forward
// without waiting on us to complete the exchange
try {
getProducers().returnObject(producer);
http://git-wip-us.apache.org/repos/asf/camel/blob/cef0dd05/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
index 9fa0a80..2420476 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
@@ -78,10 +78,10 @@ public class InOnlyTopicDurableConsumerTest extends CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("sjms:topic:foo?durableSubscriptionId=bar")
+ from("sjms:topic:foo?durableSubscriptionId=bar1")
.to("mock:result");
- from("sjms:topic:foo?durableSubscriptionId=bar")
+ from("sjms:topic:foo?durableSubscriptionId=bar2")
.to("mock:result2");
}
};