You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/02 09:48:44 UTC
[3/4] camel git commit: CAMEL-7833 Use ProducerTemplate to send
message and pass the exception to the subscriber
CAMEL-7833 Use ProducerTemplate to send message and pass the exception to the subscriber
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d85f2f0c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d85f2f0c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d85f2f0c
Branch: refs/heads/master
Commit: d85f2f0c57021066ec0d674b9c6b4b8be2ac3344
Parents: 0d9928a
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 2 15:47:48 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/camel/rx/CamelOperator.java | 48 ++++++++++++--------
1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d85f2f0c/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index f965388..2a6fa3a 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -21,21 +21,26 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Producer;
+import org.apache.camel.ProducerTemplate;
import org.apache.camel.util.ServiceHelper;
import rx.Observable;
import rx.Subscriber;
public class CamelOperator implements Observable.Operator<Message, Message> {
- private Producer producer;
+ private ProducerTemplate producerTemplate;
+ private Endpoint endpoint;
public CamelOperator(CamelContext context, String uri) throws Exception {
- this(context.getEndpoint(uri));
+ producerTemplate = context.createProducerTemplate();
+ endpoint = context.getEndpoint(uri);
+ ServiceHelper.startService(producerTemplate);
}
public CamelOperator(Endpoint endpoint) throws Exception {
- this.producer = endpoint.createProducer();
- ServiceHelper.startService(producer);
+ this.producerTemplate = endpoint.getCamelContext().createProducerTemplate();
+ this.endpoint = endpoint;
+ ServiceHelper.startService(producerTemplate);
}
@Override
@@ -44,11 +49,11 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
@Override
public void onCompleted() {
try {
- ServiceHelper.stopService(producer);
+ ServiceHelper.stopService(producerTemplate);
} catch (Exception e) {
throw new RuntimeCamelRxException(e);
} finally {
- producer = null;
+ producerTemplate = null;
}
if (!s.isUnsubscribed()) {
s.onCompleted();
@@ -57,9 +62,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
@Override
public void onError(Throwable e) {
- Exchange exchange = producer.createExchange();
- exchange.setException(e);
- process(exchange);
+ // producer cannot handler the exception
+ // so we just pass the exchange to the subscriber
if (!s.isUnsubscribed()) {
s.onError(e);
}
@@ -68,7 +72,17 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
@Override
public void onNext(Message item) {
if (!s.isUnsubscribed()) {
- s.onNext(process(item));
+ Exchange exchange = process(item);
+ if (exchange.getException() != null) {
+ s.onError(exchange.getException());
+ } else {
+ if (exchange.hasOut()) {
+ s.onNext(exchange.getOut());
+ } else {
+ s.onNext(exchange.getIn());
+ }
+ }
+
}
}
};
@@ -76,20 +90,16 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
private Exchange process(Exchange exchange) {
try {
- producer.process(exchange);
- if (exchange.hasOut()) {
- exchange.setIn(exchange.getOut());
- exchange.setOut(null);
- }
+ exchange = producerTemplate.send(endpoint, exchange);
} catch (Exception e) {
- throw new RuntimeCamelRxException(e);
+ exchange.setException(e);
}
return exchange;
}
- private Message process(Message message) {
- Exchange exchange = producer.createExchange();
+ private Exchange process(Message message) {
+ Exchange exchange = endpoint.createExchange();
exchange.setIn(message);
- return process(exchange).getIn();
+ return process(exchange);
}
}