You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/08/21 04:23:07 UTC
[1/2] git commit: CAMEL-7727: Unify MessageProducerResources handling
into SjmsProducer
Repository: camel
Updated Branches:
refs/heads/master b0c572fe6 -> 2cff2f15f
CAMEL-7727: Unify MessageProducerResources handling into SjmsProducer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f683b0b7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f683b0b7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f683b0b7
Branch: refs/heads/master
Commit: f683b0b7b1933fa492ab4f7a456691d90b17698c
Parents: b0c572f
Author: Cristiano Nicolai <cr...@gmail.com>
Authored: Wed Aug 20 21:32:05 2014 +1000
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Aug 21 10:08:04 2014 +0800
----------------------------------------------------------------------
.../camel/component/sjms/SjmsProducer.java | 39 +++---
.../component/sjms/producer/InOnlyProducer.java | 53 ++++----
.../component/sjms/producer/InOutProducer.java | 130 +++++++++----------
3 files changed, 105 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 1a31818..3edc761 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -195,7 +195,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
public abstract MessageProducerResources doCreateProducerModel() throws Exception;
- public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception;
+ public abstract void sendMessage(Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception;
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
@@ -204,25 +204,30 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
}
try {
- if (!isSynchronous()) {
- if (log.isDebugEnabled()) {
- log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody());
- }
- getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessage(exchange, callback);
- } catch (Exception e) {
- ObjectHelper.wrapRuntimeCamelException(e);
+ final MessageProducerResources producer = getProducers().borrowObject(getResponseTimeOut());
+ if(producer==null){
+ exchange.setException(new Exception("Unable to send message: connection not available"));
+ } else {
+ if (!isSynchronous()) {
+ if (log.isDebugEnabled()) {
+ log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody());
+ }
+ getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sendMessage(exchange, callback, producer);
+ } catch (Exception e) {
+ ObjectHelper.wrapRuntimeCamelException(e);
+ }
}
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(" Sending message synchronously: {}", exchange.getIn().getBody());
}
- });
- } else {
- if (log.isDebugEnabled()) {
- log.debug(" Sending message synchronously: {}", exchange.getIn().getBody());
+ sendMessage(exchange, callback, producer);
}
- sendMessage(exchange, callback);
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index e841e6b..c83546f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -91,41 +91,36 @@ public class InOnlyProducer extends SjmsProducer {
* @throws Exception
*/
@Override
- public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
- Collection<Message> messages = new ArrayList<Message>(1);
- MessageProducerResources producer = getProducers().borrowObject();
+ public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception {
try {
- if (producer != null) {
- if (exchange.getIn().getBody() != null) {
- if (exchange.getIn().getBody() instanceof List) {
- Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody();
- for (final Object object : payload) {
- Message message;
- if (BatchMessage.class.isInstance(object)) {
- BatchMessage<?> batchMessage = (BatchMessage<?>)object;
- message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
- .getJmsKeyFormatStrategy());
- } else {
- message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
- }
- messages.add(message);
+ Collection<Message> messages = new ArrayList<Message>(1);
+ if (exchange.getIn().getBody() != null) {
+ if (exchange.getIn().getBody() instanceof List) {
+ Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody();
+ for (final Object object : payload) {
+ Message message;
+ if (BatchMessage.class.isInstance(object)) {
+ BatchMessage<?> batchMessage = (BatchMessage<?>)object;
+ message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
+ .getJmsKeyFormatStrategy());
+ } else {
+ message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
}
- } else {
- Object payload = exchange.getIn().getBody();
- Message message = JmsMessageHelper
- .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
messages.add(message);
}
+ } else {
+ Object payload = exchange.getIn().getBody();
+ Message message = JmsMessageHelper
+ .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+ messages.add(message);
}
+ }
- if (isEndpointTransacted()) {
- exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
- }
- for (final Message message : messages) {
- producer.getMessageProducer().send(message);
- }
- } else {
- exchange.setException(new Exception("Unable to send message: connection not available"));
+ if (isEndpointTransacted()) {
+ exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
+ }
+ for (final Message message : messages) {
+ producer.getMessageProducer().send(message);
}
} catch (Exception e) {
exchange.setException(new Exception("Unable to complete sending the message: " + e.getLocalizedMessage()));
http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 2b93df7..605b15c 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -282,86 +282,74 @@ public class InOutProducer extends SjmsProducer {
* @throws Exception
*/
@Override
- public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
- if (getProducers() != null) {
- MessageProducerResources producer = null;
- try {
- producer = getProducers().borrowObject(getResponseTimeOut());
- } catch (Exception e1) {
- log.warn("The producer pool is exhausted. Consider setting producerCount to a higher value or disable the fixed size of the pool by setting fixedResourcePool=false.");
- exchange.setException(new Exception("Producer Resource Pool is exhausted"));
- }
- if (producer != null) {
-
- if (isEndpointTransacted()) {
- exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
- }
+ public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception {
+ if (isEndpointTransacted()) {
+ exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
+ }
- Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+ Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy());
- // TODO just set the correlation id don't get it from the
- // message
- String correlationId = null;
- if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) {
- correlationId = UUID.randomUUID().toString().replace("-", "");
- } else {
- correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
- }
- Object responseObject = null;
- Exchanger<Object> messageExchanger = new Exchanger<Object>();
- SjmsExchangeMessageHelper.setCorrelationId(request, correlationId);
- try {
- lock.writeLock().lock();
- exchangerMap.put(correlationId, messageExchanger);
- } finally {
- lock.writeLock().unlock();
- }
+ // TODO just set the correlation id don't get it from the
+ // message
+ String correlationId = null;
+ if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) {
+ correlationId = UUID.randomUUID().toString().replace("-", "");
+ } else {
+ correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+ }
+ Object responseObject = null;
+ Exchanger<Object> messageExchanger = new Exchanger<Object>();
+ SjmsExchangeMessageHelper.setCorrelationId(request, correlationId);
+ try {
+ lock.writeLock().lock();
+ exchangerMap.put(correlationId, messageExchanger);
+ } finally {
+ lock.writeLock().unlock();
+ }
- MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
- SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
- consumers.returnObject(consumer);
- producer.getMessageProducer().send(request);
-
- // Return the producer to the pool so another waiting producer
- // can move forward
- // without waiting on us to complete the exchange
- try {
- getProducers().returnObject(producer);
- } catch (Exception exception) {
- // thrown if the pool is full. safe to ignore.
- }
+ MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
+ SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+ consumers.returnObject(consumer);
+ producer.getMessageProducer().send(request);
- try {
- responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+ // Return the producer to the pool so another waiting producer
+ // can move forward
+ // without waiting on us to complete the exchange
+ try {
+ getProducers().returnObject(producer);
+ } catch (Exception exception) {
+ // thrown if the pool is full. safe to ignore.
+ }
- try {
- lock.writeLock().lock();
- exchangerMap.remove(correlationId);
- } finally {
- lock.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.debug("Exchanger was interrupted while waiting on response", e);
- exchange.setException(e);
- } catch (TimeoutException e) {
- log.debug("Exchanger timed out while waiting on response", e);
- exchange.setException(e);
- }
+ try {
+ responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
- if (exchange.getException() == null) {
- if (responseObject instanceof Throwable) {
- exchange.setException((Throwable)responseObject);
- } else if (responseObject instanceof Message) {
- Message response = (Message)responseObject;
- SjmsExchangeMessageHelper.populateExchange(response, exchange, true);
- } else {
- exchange.setException(new CamelException("Unknown response type: " + responseObject));
- }
- }
+ try {
+ lock.writeLock().lock();
+ exchangerMap.remove(correlationId);
+ } finally {
+ lock.writeLock().unlock();
}
+ } catch (InterruptedException e) {
+ log.debug("Exchanger was interrupted while waiting on response", e);
+ exchange.setException(e);
+ } catch (TimeoutException e) {
+ log.debug("Exchanger timed out while waiting on response", e);
+ exchange.setException(e);
+ }
- callback.done(isSynchronous());
+ if (exchange.getException() == null) {
+ if (responseObject instanceof Throwable) {
+ exchange.setException((Throwable)responseObject);
+ } else if (responseObject instanceof Message) {
+ Message response = (Message)responseObject;
+ SjmsExchangeMessageHelper.populateExchange(response, exchange, true);
+ } else {
+ exchange.setException(new CamelException("Unknown response type: " + responseObject));
+ }
}
+
+ callback.done(isSynchronous());
}
public void setConsumers(MessageConsumerPool consumers) {
[2/2] git commit: CAMEL-7727 Updated the SjmsProducer code
Posted by ni...@apache.org.
CAMEL-7727 Updated the SjmsProducer code
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2cff2f15
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2cff2f15
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2cff2f15
Branch: refs/heads/master
Commit: 2cff2f15faf427d2fcc8e55ba3777e94eeb8d1ec
Parents: f683b0b
Author: Cristiano Nicolai <cr...@gmail.com>
Authored: Wed Aug 20 23:56:14 2014 +1000
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Aug 21 10:16:18 2014 +0800
----------------------------------------------------------------------
.../main/java/org/apache/camel/component/sjms/SjmsProducer.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2cff2f15/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 3edc761..f7474d0 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -204,8 +204,8 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
}
try {
- final MessageProducerResources producer = getProducers().borrowObject(getResponseTimeOut());
- if(producer==null){
+ final MessageProducerResources producer = getProducers().borrowObject();
+ if (producer == null) {
exchange.setException(new Exception("Unable to send message: connection not available"));
} else {
if (!isSynchronous()) {