You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/08/21 04:23:07 UTC

[1/2] git commit: CAMEL-7727: Unify MessageProducerResources handling into SjmsProducer

Repository: camel
Updated Branches:
  refs/heads/master b0c572fe6 -> 2cff2f15f


CAMEL-7727: Unify MessageProducerResources handling into SjmsProducer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f683b0b7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f683b0b7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f683b0b7

Branch: refs/heads/master
Commit: f683b0b7b1933fa492ab4f7a456691d90b17698c
Parents: b0c572f
Author: Cristiano Nicolai <cr...@gmail.com>
Authored: Wed Aug 20 21:32:05 2014 +1000
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Aug 21 10:08:04 2014 +0800

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsProducer.java      |  39 +++---
 .../component/sjms/producer/InOnlyProducer.java |  53 ++++----
 .../component/sjms/producer/InOutProducer.java  | 130 +++++++++----------
 3 files changed, 105 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 1a31818..3edc761 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -195,7 +195,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
 
     public abstract MessageProducerResources doCreateProducerModel() throws Exception;
 
-    public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception;
+    public abstract void sendMessage(Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception;
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
@@ -204,25 +204,30 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
         }
 
         try {
-            if (!isSynchronous()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("  Sending message asynchronously: {}", exchange.getIn().getBody());
-                }
-                getExecutor().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            sendMessage(exchange, callback);
-                        } catch (Exception e) {
-                            ObjectHelper.wrapRuntimeCamelException(e);
+            final MessageProducerResources producer = getProducers().borrowObject(getResponseTimeOut());
+            if(producer==null){
+                exchange.setException(new Exception("Unable to send message: connection not available"));
+            } else {
+                if (!isSynchronous()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("  Sending message asynchronously: {}", exchange.getIn().getBody());
+                    }
+                    getExecutor().execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                sendMessage(exchange, callback, producer);
+                            } catch (Exception e) {
+                                ObjectHelper.wrapRuntimeCamelException(e);
+                            }
                         }
+                    });
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("  Sending message synchronously: {}", exchange.getIn().getBody());
                     }
-                });
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("  Sending message synchronously: {}", exchange.getIn().getBody());
+                    sendMessage(exchange, callback, producer);
                 }
-                sendMessage(exchange, callback);
             }
         } catch (Exception e) {
             if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index e841e6b..c83546f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -91,41 +91,36 @@ public class InOnlyProducer extends SjmsProducer {
      * @throws Exception
      */
     @Override
-    public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
-        Collection<Message> messages = new ArrayList<Message>(1);
-        MessageProducerResources producer = getProducers().borrowObject();
+    public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception {
         try {
-            if (producer != null) {
-                if (exchange.getIn().getBody() != null) {
-                    if (exchange.getIn().getBody() instanceof List) {
-                        Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody();
-                        for (final Object object : payload) {
-                            Message message;
-                            if (BatchMessage.class.isInstance(object)) {
-                                BatchMessage<?> batchMessage = (BatchMessage<?>)object;
-                                message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
-                                    .getJmsKeyFormatStrategy());
-                            } else {
-                                message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
-                            }
-                            messages.add(message);
+            Collection<Message> messages = new ArrayList<Message>(1);
+            if (exchange.getIn().getBody() != null) {
+                if (exchange.getIn().getBody() instanceof List) {
+                    Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody();
+                    for (final Object object : payload) {
+                        Message message;
+                        if (BatchMessage.class.isInstance(object)) {
+                            BatchMessage<?> batchMessage = (BatchMessage<?>)object;
+                            message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
+                                .getJmsKeyFormatStrategy());
+                        } else {
+                            message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
                         }
-                    } else {
-                        Object payload = exchange.getIn().getBody();
-                        Message message = JmsMessageHelper
-                            .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
                         messages.add(message);
                     }
+                } else {
+                    Object payload = exchange.getIn().getBody();
+                    Message message = JmsMessageHelper
+                        .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+                    messages.add(message);
                 }
+            }
 
-                if (isEndpointTransacted()) {
-                    exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
-                }
-                for (final Message message : messages) {
-                    producer.getMessageProducer().send(message);
-                }
-            } else {
-                exchange.setException(new Exception("Unable to send message: connection not available"));
+            if (isEndpointTransacted()) {
+                exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
+            }
+            for (final Message message : messages) {
+                producer.getMessageProducer().send(message);
             }
         } catch (Exception e) {
             exchange.setException(new Exception("Unable to complete sending the message: " + e.getLocalizedMessage()));

http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 2b93df7..605b15c 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -282,86 +282,74 @@ public class InOutProducer extends SjmsProducer {
      * @throws Exception
      */
     @Override
-    public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
-        if (getProducers() != null) {
-            MessageProducerResources producer = null;
-            try {
-                producer = getProducers().borrowObject(getResponseTimeOut());
-            } catch (Exception e1) {
-                log.warn("The producer pool is exhausted.  Consider setting producerCount to a higher value or disable the fixed size of the pool by setting fixedResourcePool=false.");
-                exchange.setException(new Exception("Producer Resource Pool is exhausted"));
-            }
-            if (producer != null) {
-
-                if (isEndpointTransacted()) {
-                    exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
-                }
+    public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception {
+        if (isEndpointTransacted()) {
+            exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
+        }
 
-                Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+        Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy());
 
-                // TODO just set the correlation id don't get it from the
-                // message
-                String correlationId = null;
-                if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) {
-                    correlationId = UUID.randomUUID().toString().replace("-", "");
-                } else {
-                    correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
-                }
-                Object responseObject = null;
-                Exchanger<Object> messageExchanger = new Exchanger<Object>();
-                SjmsExchangeMessageHelper.setCorrelationId(request, correlationId);
-                try {
-                    lock.writeLock().lock();
-                    exchangerMap.put(correlationId, messageExchanger);
-                } finally {
-                    lock.writeLock().unlock();
-                }
+        // TODO just set the correlation id don't get it from the
+        // message
+        String correlationId = null;
+        if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) {
+            correlationId = UUID.randomUUID().toString().replace("-", "");
+        } else {
+            correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+        }
+        Object responseObject = null;
+        Exchanger<Object> messageExchanger = new Exchanger<Object>();
+        SjmsExchangeMessageHelper.setCorrelationId(request, correlationId);
+        try {
+            lock.writeLock().lock();
+            exchangerMap.put(correlationId, messageExchanger);
+        } finally {
+            lock.writeLock().unlock();
+        }
 
-                MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
-                SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
-                consumers.returnObject(consumer);
-                producer.getMessageProducer().send(request);
-
-                // Return the producer to the pool so another waiting producer
-                // can move forward
-                // without waiting on us to complete the exchange
-                try {
-                    getProducers().returnObject(producer);
-                } catch (Exception exception) {
-                    // thrown if the pool is full. safe to ignore.
-                }
+        MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
+        SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+        consumers.returnObject(consumer);
+        producer.getMessageProducer().send(request);
 
-                try {
-                    responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+        // Return the producer to the pool so another waiting producer
+        // can move forward
+        // without waiting on us to complete the exchange
+        try {
+            getProducers().returnObject(producer);
+        } catch (Exception exception) {
+            // thrown if the pool is full. safe to ignore.
+        }
 
-                    try {
-                        lock.writeLock().lock();
-                        exchangerMap.remove(correlationId);
-                    } finally {
-                        lock.writeLock().unlock();
-                    }
-                } catch (InterruptedException e) {
-                    log.debug("Exchanger was interrupted while waiting on response", e);
-                    exchange.setException(e);
-                } catch (TimeoutException e) {
-                    log.debug("Exchanger timed out while waiting on response", e);
-                    exchange.setException(e);
-                }
+        try {
+            responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
 
-                if (exchange.getException() == null) {
-                    if (responseObject instanceof Throwable) {
-                        exchange.setException((Throwable)responseObject);
-                    } else if (responseObject instanceof Message) {
-                        Message response = (Message)responseObject;
-                        SjmsExchangeMessageHelper.populateExchange(response, exchange, true);
-                    } else {
-                        exchange.setException(new CamelException("Unknown response type: " + responseObject));
-                    }
-                }
+            try {
+                lock.writeLock().lock();
+                exchangerMap.remove(correlationId);
+            } finally {
+                lock.writeLock().unlock();
             }
+        } catch (InterruptedException e) {
+            log.debug("Exchanger was interrupted while waiting on response", e);
+            exchange.setException(e);
+        } catch (TimeoutException e) {
+            log.debug("Exchanger timed out while waiting on response", e);
+            exchange.setException(e);
+        }
 
-            callback.done(isSynchronous());
+        if (exchange.getException() == null) {
+            if (responseObject instanceof Throwable) {
+                exchange.setException((Throwable)responseObject);
+            } else if (responseObject instanceof Message) {
+                Message response = (Message)responseObject;
+                SjmsExchangeMessageHelper.populateExchange(response, exchange, true);
+            } else {
+                exchange.setException(new CamelException("Unknown response type: " + responseObject));
+            }
         }
+
+        callback.done(isSynchronous());
     }
 
     public void setConsumers(MessageConsumerPool consumers) {


[2/2] git commit: CAMEL-7727 Updated the SjmsProducer code

Posted by ni...@apache.org.
CAMEL-7727 Updated the SjmsProducer code


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2cff2f15
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2cff2f15
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2cff2f15

Branch: refs/heads/master
Commit: 2cff2f15faf427d2fcc8e55ba3777e94eeb8d1ec
Parents: f683b0b
Author: Cristiano Nicolai <cr...@gmail.com>
Authored: Wed Aug 20 23:56:14 2014 +1000
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Aug 21 10:16:18 2014 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/sjms/SjmsProducer.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2cff2f15/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 3edc761..f7474d0 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -204,8 +204,8 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
         }
 
         try {
-            final MessageProducerResources producer = getProducers().borrowObject(getResponseTimeOut());
-            if(producer==null){
+            final MessageProducerResources producer = getProducers().borrowObject();
+            if (producer == null) {
                 exchange.setException(new Exception("Unable to send message: connection not available"));
             } else {
                 if (!isSynchronous()) {