You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/02 09:48:44 UTC

[3/4] camel git commit: CAMEL-7833 Use ProducerTemplate to send message and pass the exception to the subscriber

CAMEL-7833 Use ProducerTemplate to send message and pass the exception to the subscriber


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d85f2f0c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d85f2f0c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d85f2f0c

Branch: refs/heads/master
Commit: d85f2f0c57021066ec0d674b9c6b4b8be2ac3344
Parents: 0d9928a
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 2 15:47:48 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/CamelOperator.java | 48 ++++++++++++--------
 1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d85f2f0c/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index f965388..2a6fa3a 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -21,21 +21,26 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Producer;
+import org.apache.camel.ProducerTemplate;
 import org.apache.camel.util.ServiceHelper;
 import rx.Observable;
 import rx.Subscriber;
 
 public class CamelOperator implements Observable.Operator<Message, Message> {
 
-    private Producer producer;
+    private ProducerTemplate producerTemplate;
+    private Endpoint endpoint;
 
     public CamelOperator(CamelContext context, String uri) throws Exception {
-        this(context.getEndpoint(uri));
+        producerTemplate = context.createProducerTemplate();
+        endpoint = context.getEndpoint(uri);
+        ServiceHelper.startService(producerTemplate);
     }
 
     public CamelOperator(Endpoint endpoint) throws Exception {
-        this.producer = endpoint.createProducer();
-        ServiceHelper.startService(producer);
+        this.producerTemplate = endpoint.getCamelContext().createProducerTemplate();
+        this.endpoint = endpoint;
+        ServiceHelper.startService(producerTemplate);
     }
 
     @Override
@@ -44,11 +49,11 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
             @Override
             public void onCompleted() {
                 try {
-                    ServiceHelper.stopService(producer);
+                    ServiceHelper.stopService(producerTemplate);
                 } catch (Exception e) {
                     throw new RuntimeCamelRxException(e);
                 } finally {
-                    producer = null;
+                    producerTemplate = null;
                 }
                 if (!s.isUnsubscribed()) {
                     s.onCompleted();
@@ -57,9 +62,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
 
             @Override
             public void onError(Throwable e) {
-                Exchange exchange = producer.createExchange();
-                exchange.setException(e);
-                process(exchange);
+                // producer cannot handler the exception
+                // so we just pass the exchange to the subscriber 
                 if (!s.isUnsubscribed()) {
                     s.onError(e);
                 }
@@ -68,7 +72,17 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
             @Override
             public void onNext(Message item) {
                 if (!s.isUnsubscribed()) {
-                    s.onNext(process(item));
+                    Exchange exchange = process(item);
+                    if (exchange.getException() != null) {
+                        s.onError(exchange.getException());
+                    } else {
+                        if (exchange.hasOut()) {
+                            s.onNext(exchange.getOut());
+                        } else {
+                            s.onNext(exchange.getIn());
+                        }
+                    }
+
                 }
             }
         };
@@ -76,20 +90,16 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
 
     private Exchange process(Exchange exchange) {
         try {
-            producer.process(exchange);
-            if (exchange.hasOut()) {
-                exchange.setIn(exchange.getOut());
-                exchange.setOut(null);
-            }
+            exchange = producerTemplate.send(endpoint, exchange);
         } catch (Exception e) {
-            throw new RuntimeCamelRxException(e);
+            exchange.setException(e);
         }
         return exchange;
     }
 
-    private Message process(Message message) {
-        Exchange exchange = producer.createExchange();
+    private Exchange process(Message message) {
+        Exchange exchange = endpoint.createExchange();
         exchange.setIn(message);
-        return process(exchange).getIn();
+        return process(exchange);
     }
 }