You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/01/10 10:59:02 UTC
git commit: CAMEL-7107 avoid the NPE in case of connection loss with
thanks to Marios
Updated Branches:
refs/heads/master a0ee918f3 -> c61a64eab
CAMEL-7107 avoid the NPE in case of connection loss with thanks to Marios
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c61a64ea
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c61a64ea
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c61a64ea
Branch: refs/heads/master
Commit: c61a64eabcdeee25f296a371418050ac45669853
Parents: a0ee918
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Jan 10 17:57:07 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Jan 10 17:58:47 2014 +0800
----------------------------------------------------------------------
.../component/sjms/producer/InOnlyProducer.java | 35 +++++++++++---------
1 file changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c61a64ea/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index df689b2..84eb1f5 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.sjms.producer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import javax.jms.Connection;
@@ -25,9 +26,9 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.BatchMessage;
-import org.apache.camel.component.sjms.SjmsEndpoint;
import org.apache.camel.component.sjms.SjmsProducer;
import org.apache.camel.component.sjms.TransactionCommitStrategy;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
@@ -40,7 +41,7 @@ import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
*/
public class InOnlyProducer extends SjmsProducer {
- public InOnlyProducer(SjmsEndpoint endpoint) {
+ public InOnlyProducer(final Endpoint endpoint) {
super(endpoint);
}
@@ -55,11 +56,9 @@ public class InOnlyProducer extends SjmsProducer {
Connection conn = null;
try {
conn = getConnectionResource().borrowConnection();
-
TransactionCommitStrategy commitStrategy = null;
- Session session = null;
- MessageProducer messageProducer = null;
-
+ Session session;
+
if (isEndpointTransacted()) {
if (getCommitStrategy() != null) {
commitStrategy = getCommitStrategy();
@@ -70,6 +69,8 @@ public class InOnlyProducer extends SjmsProducer {
} else {
session = conn.createSession(false, getAcknowledgeMode());
}
+
+ MessageProducer messageProducer;
if (isTopic()) {
messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
} else {
@@ -94,16 +95,16 @@ public class InOnlyProducer extends SjmsProducer {
* @throws Exception
*/
@Override
- public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
- List<Message> messages = new ArrayList<Message>();
+ public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
+ Collection<Message> messages = new ArrayList<Message>(1);
MessageProducerResources producer = getProducers().borrowObject();
try {
- if (getProducers() != null) {
+ if (producer != null) {
if (exchange.getIn().getBody() != null) {
if (exchange.getIn().getBody() instanceof List) {
- List<?> payload = (List<?>)exchange.getIn().getBody();
- for (Object object : payload) {
- Message message = null;
+ Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody();
+ for (final Object object : payload) {
+ Message message;
if (BatchMessage.class.isInstance(object)) {
BatchMessage<?> batchMessage = (BatchMessage<?>)object;
message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
@@ -124,14 +125,18 @@ public class InOnlyProducer extends SjmsProducer {
if (isEndpointTransacted()) {
exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
}
- for (Message message : messages) {
+ for (final Message message : messages) {
producer.getMessageProducer().send(message);
}
+ } else {
+ exchange.setException(new Exception("Unable to send message: connection not available"));
}
} catch (Exception e) {
- exchange.setException(new Exception("Unable to complet sending the message: " + e.getLocalizedMessage()));
+ exchange.setException(new Exception("Unable to complete sending the message: " + e.getLocalizedMessage()));
} finally {
- getProducers().returnObject(producer);
+ if (producer != null) {
+ getProducers().returnObject(producer);
+ }
callback.done(isSynchronous());
}
}