You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/02 09:48:42 UTC

[1/4] camel git commit: CAMEL-7833 Added CamelOperator for lift()

Repository: camel
Updated Branches:
  refs/heads/master 33e7087fb -> d85f2f0c5


CAMEL-7833 Added CamelOperator for lift()


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/081e8a73
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/081e8a73
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/081e8a73

Branch: refs/heads/master
Commit: 081e8a73195ce5776b3581b31ed15cbcadc68624
Parents: 33e7087
Author: Jyrki Ruuskanen <yu...@kotikone.fi>
Authored: Wed Apr 1 17:54:26 2015 +0300
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:03 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/CamelOperator.java | 79 ++++++++++++++++++++
 1 file changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/081e8a73/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
new file mode 100644
index 0000000..c218776
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -0,0 +1,79 @@
+package org.apache.camel.rx;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+import rx.Observable;
+import rx.Subscriber;
+
+public class CamelOperator implements Observable.Operator<Message, Message> {
+
+    private Producer producer;
+
+    public CamelOperator(CamelContext context, String uri) throws Exception {
+        this(context.getEndpoint(uri));
+    }
+
+    public CamelOperator(Endpoint endpoint) throws Exception {
+        this.producer = endpoint.createProducer();
+        ServiceHelper.startService(producer);
+    }
+
+    @Override
+    public Subscriber<? super Message> call(final Subscriber<? super Message> s) {
+        return new Subscriber<Message>(s) {
+            @Override
+            public void onCompleted() {
+                try {
+                    ServiceHelper.stopService(producer);
+                } catch (Exception e) {
+                    throw new RuntimeCamelRxException(e);
+                } finally {
+                    producer = null;
+                }
+                if (!s.isUnsubscribed()) {
+                    s.onCompleted();
+                }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                Exchange exchange = producer.createExchange();
+                exchange.setException(e);
+                process(exchange);
+                if (!s.isUnsubscribed()) {
+                    s.onError(e);
+                }
+            }
+
+            @Override
+            public void onNext(Message item) {
+                if (!s.isUnsubscribed()) {
+                    s.onNext(process(item));
+                }
+            }
+        };
+    }
+
+    private Exchange process(Exchange exchange) {
+        try {
+            producer.process(exchange);
+            if (exchange.hasOut()) {
+                exchange.setIn(exchange.getOut());
+                exchange.setOut(null);
+            }
+        } catch (Exception e) {
+            throw new RuntimeCamelRxException(e);
+        }
+        return exchange;
+    }
+
+    private Message process(Message message) {
+        Exchange exchange = producer.createExchange();
+        exchange.setIn(message);
+        return process(exchange).getIn();
+    }
+}


[2/4] camel git commit: CAMEL-7833 Added CamelOperatorTest

Posted by ni...@apache.org.
CAMEL-7833 Added CamelOperatorTest


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d9b777a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d9b777a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d9b777a

Branch: refs/heads/master
Commit: 2d9b777a02cbe8359b738fadadf28da3eb4b96f6
Parents: 081e8a7
Author: Jyrki Ruuskanen <yu...@kotikone.fi>
Authored: Wed Apr 1 18:34:47 2015 +0300
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/CamelOperator.java | 16 ++++++
 .../org/apache/camel/rx/CamelOperatorTest.java  | 57 ++++++++++++++++++++
 2 files changed, 73 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2d9b777a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index c218776..f965388 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.rx;
 
 import org.apache.camel.CamelContext;

http://git-wip-us.apache.org/repos/asf/camel/blob/2d9b777a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
new file mode 100644
index 0000000..8667e3b
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Message;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+
+/**
+ */
+public class CamelOperatorTest extends RxTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelOperatorTest.class);
+
+    @Test
+    public void testCamelOperator() throws Exception {
+        final MockEndpoint mockEndpoint1 = camelContext.getEndpoint("mock:results1", MockEndpoint.class);
+        final MockEndpoint mockEndpoint2 = camelContext.getEndpoint("mock:results2", MockEndpoint.class);
+        final MockEndpoint mockEndpoint3 = camelContext.getEndpoint("mock:results3", MockEndpoint.class);
+        mockEndpoint1.expectedMessageCount(2);
+        mockEndpoint2.expectedMessageCount(1);
+        mockEndpoint3.expectedMessageCount(1);
+
+        Observable<Message> result = reactiveCamel.toObservable("direct:start")
+            .lift(new CamelOperator(camelContext, "mock:results1"))
+            .lift(new CamelOperator(camelContext, "log:foo"))
+            .debounce(1, TimeUnit.SECONDS)
+            .lift(new CamelOperator(mockEndpoint2));
+        reactiveCamel.sendTo(result, "mock:results3");
+
+        // Send two test messages
+        producerTemplate.sendBody("direct:start", "<test/>");
+        producerTemplate.sendBody("direct:start", "<test/>");
+
+        mockEndpoint1.assertIsSatisfied();
+        mockEndpoint2.assertIsSatisfied();
+        mockEndpoint3.assertIsSatisfied();
+    }
+}


[3/4] camel git commit: CAMEL-7833 Use ProducerTemplate to send message and pass the exception to the subscriber

Posted by ni...@apache.org.
CAMEL-7833 Use ProducerTemplate to send message and pass the exception to the subscriber


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d85f2f0c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d85f2f0c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d85f2f0c

Branch: refs/heads/master
Commit: d85f2f0c57021066ec0d674b9c6b4b8be2ac3344
Parents: 0d9928a
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 2 15:47:48 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/CamelOperator.java | 48 ++++++++++++--------
 1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d85f2f0c/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index f965388..2a6fa3a 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -21,21 +21,26 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Producer;
+import org.apache.camel.ProducerTemplate;
 import org.apache.camel.util.ServiceHelper;
 import rx.Observable;
 import rx.Subscriber;
 
 public class CamelOperator implements Observable.Operator<Message, Message> {
 
-    private Producer producer;
+    private ProducerTemplate producerTemplate;
+    private Endpoint endpoint;
 
     public CamelOperator(CamelContext context, String uri) throws Exception {
-        this(context.getEndpoint(uri));
+        producerTemplate = context.createProducerTemplate();
+        endpoint = context.getEndpoint(uri);
+        ServiceHelper.startService(producerTemplate);
     }
 
     public CamelOperator(Endpoint endpoint) throws Exception {
-        this.producer = endpoint.createProducer();
-        ServiceHelper.startService(producer);
+        this.producerTemplate = endpoint.getCamelContext().createProducerTemplate();
+        this.endpoint = endpoint;
+        ServiceHelper.startService(producerTemplate);
     }
 
     @Override
@@ -44,11 +49,11 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
             @Override
             public void onCompleted() {
                 try {
-                    ServiceHelper.stopService(producer);
+                    ServiceHelper.stopService(producerTemplate);
                 } catch (Exception e) {
                     throw new RuntimeCamelRxException(e);
                 } finally {
-                    producer = null;
+                    producerTemplate = null;
                 }
                 if (!s.isUnsubscribed()) {
                     s.onCompleted();
@@ -57,9 +62,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
 
             @Override
             public void onError(Throwable e) {
-                Exchange exchange = producer.createExchange();
-                exchange.setException(e);
-                process(exchange);
+                // producer cannot handler the exception
+                // so we just pass the exchange to the subscriber 
                 if (!s.isUnsubscribed()) {
                     s.onError(e);
                 }
@@ -68,7 +72,17 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
             @Override
             public void onNext(Message item) {
                 if (!s.isUnsubscribed()) {
-                    s.onNext(process(item));
+                    Exchange exchange = process(item);
+                    if (exchange.getException() != null) {
+                        s.onError(exchange.getException());
+                    } else {
+                        if (exchange.hasOut()) {
+                            s.onNext(exchange.getOut());
+                        } else {
+                            s.onNext(exchange.getIn());
+                        }
+                    }
+
                 }
             }
         };
@@ -76,20 +90,16 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
 
     private Exchange process(Exchange exchange) {
         try {
-            producer.process(exchange);
-            if (exchange.hasOut()) {
-                exchange.setIn(exchange.getOut());
-                exchange.setOut(null);
-            }
+            exchange = producerTemplate.send(endpoint, exchange);
         } catch (Exception e) {
-            throw new RuntimeCamelRxException(e);
+            exchange.setException(e);
         }
         return exchange;
     }
 
-    private Message process(Message message) {
-        Exchange exchange = producer.createExchange();
+    private Exchange process(Message message) {
+        Exchange exchange = endpoint.createExchange();
         exchange.setIn(message);
-        return process(exchange).getIn();
+        return process(exchange);
     }
 }


[4/4] camel git commit: Added a comment for the Main.getOrCreateCamelContext()

Posted by ni...@apache.org.
Added a comment for the Main.getOrCreateCamelContext()


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0d9928a2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0d9928a2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0d9928a2

Branch: refs/heads/master
Commit: 0d9928a23e3b2c89638b3ec15dd0e3bb3dc35aa6
Parents: 2d9b777
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 2 13:46:41 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800

----------------------------------------------------------------------
 camel-core/src/main/java/org/apache/camel/main/Main.java | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0d9928a2/camel-core/src/main/java/org/apache/camel/main/Main.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/Main.java b/camel-core/src/main/java/org/apache/camel/main/Main.java
index 7a50785..1a7d74f 100644
--- a/camel-core/src/main/java/org/apache/camel/main/Main.java
+++ b/camel-core/src/main/java/org/apache/camel/main/Main.java
@@ -99,7 +99,11 @@ public class Main extends MainSupport {
     }
 
     /**
+     * 
      * Gets or creates the {@link org.apache.camel.CamelContext} this main class is using.
+     * 
+     * It just create a new CamelContextMap per call, please don't use it to access the camel context that will be ran by main.
+     * If you want to setup the CamelContext please use MainListener to get the new created camel context.
      */
     public CamelContext getOrCreateCamelContext() {
         // force init