You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/06 09:39:09 UTC

[1/2] camel git commit: CAMEL-7833 InOnly and InOut routes as Observable sequences

Repository: camel
Updated Branches:
  refs/heads/master 9b8397f08 -> 9020df3be


CAMEL-7833 InOnly and InOut routes as Observable<Exchange> sequences


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33ebe714
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33ebe714
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33ebe714

Branch: refs/heads/master
Commit: 33ebe714a9b304632eaeed2bdd0859d583b41563
Parents: 9b8397f
Author: Jyrki Ruuskanen <yu...@kotikone.fi>
Authored: Sun Apr 5 12:59:12 2015 +0300
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Apr 6 15:38:47 2015 +0800

----------------------------------------------------------------------
 components/camel-rx/pom.xml                     |  9 ++++
 .../java/org/apache/camel/rx/CamelOperator.java | 24 +++------
 .../java/org/apache/camel/rx/ReactiveCamel.java | 28 ++++++++++-
 .../org/apache/camel/rx/CamelOperatorTest.java  | 53 +++++++++++++++++---
 .../src/test/resources/log4j.properties         |  2 +-
 5 files changed, 87 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rx/pom.xml b/components/camel-rx/pom.xml
index cb4827f..63baf5f 100644
--- a/components/camel-rx/pom.xml
+++ b/components/camel-rx/pom.xml
@@ -39,6 +39,10 @@
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-restlet</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>io.reactivex</groupId>
@@ -62,6 +66,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.jayway.restassured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <version>2.3.0</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index 2a6fa3a..917f069 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -19,14 +19,13 @@ package org.apache.camel.rx;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.processor.PipelineHelper;
 import org.apache.camel.util.ServiceHelper;
 import rx.Observable;
 import rx.Subscriber;
 
-public class CamelOperator implements Observable.Operator<Message, Message> {
+public class CamelOperator implements Observable.Operator<Exchange, Exchange> {
 
     private ProducerTemplate producerTemplate;
     private Endpoint endpoint;
@@ -44,8 +43,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
     }
 
     @Override
-    public Subscriber<? super Message> call(final Subscriber<? super Message> s) {
-        return new Subscriber<Message>(s) {
+    public Subscriber<? super Exchange> call(final Subscriber<? super Exchange> s) {
+        return new Subscriber<Exchange>(s) {
             @Override
             public void onCompleted() {
                 try {
@@ -70,19 +69,14 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
             }
 
             @Override
-            public void onNext(Message item) {
+            public void onNext(Exchange item) {
                 if (!s.isUnsubscribed()) {
                     Exchange exchange = process(item);
                     if (exchange.getException() != null) {
                         s.onError(exchange.getException());
                     } else {
-                        if (exchange.hasOut()) {
-                            s.onNext(exchange.getOut());
-                        } else {
-                            s.onNext(exchange.getIn());
-                        }
+                        s.onNext(PipelineHelper.createNextExchange(exchange));
                     }
-
                 }
             }
         };
@@ -96,10 +90,4 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
         }
         return exchange;
     }
-
-    private Exchange process(Message message) {
-        Exchange exchange = endpoint.createExchange();
-        exchange.setIn(message);
-        return process(exchange);
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index 678c4e8..e0eb869 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -23,7 +23,6 @@ import org.apache.camel.Message;
 import org.apache.camel.rx.support.EndpointObservable;
 import org.apache.camel.rx.support.EndpointSubscribeFunc;
 import org.apache.camel.rx.support.ExchangeToBodyFunc1;
-import org.apache.camel.rx.support.ExchangeToMessageFunc1;
 import org.apache.camel.rx.support.ObserverSender;
 import org.apache.camel.util.CamelContextHelper;
 import rx.Observable;
@@ -62,7 +61,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/">Reactive Extensions</a>
      */
     public Observable<Message> toObservable(Endpoint endpoint) {
-        return createEndpointObservable(endpoint, ExchangeToMessageFunc1.getInstance());
+        return toObservable(endpoint, Message.class);
     }
 
     /**
@@ -93,6 +92,20 @@ public class ReactiveCamel {
     }
 
     /**
+     * Convenience method for beginning the route
+     */
+    public Observable<Exchange> from(Endpoint endpoint) {
+        return createEndpointObservable(endpoint);
+    }
+
+    /**
+     * Convenience method for beginning the route
+     */
+    public Observable<Exchange> from(String uri) {
+        return from(endpoint(uri));
+    }
+
+    /**
      * Convenience method for creating CamelOperator instances
      */
     public CamelOperator to(String uri) throws Exception {
@@ -124,4 +137,15 @@ public class ReactiveCamel {
         return new EndpointObservable<T>(endpoint, func);
     }
 
+    /**
+     * Return a newly created {@link Observable} without conversion
+     */
+    protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
+        return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<>(endpoint, new Func1<Exchange, Exchange>() {
+            @Override
+            public Exchange call(Exchange exchange) {
+                return exchange;
+            }
+        }));
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
index f0bacc3..8d09858 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -18,15 +18,18 @@ package org.apache.camel.rx;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.Message;
+import org.apache.camel.Exchange;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.Observable;
 import rx.Subscription;
+import rx.functions.Func1;
 import rx.observables.ConnectableObservable;
 
+import static com.jayway.restassured.RestAssured.*;
+import static org.hamcrest.Matchers.*;
+
 /**
  */
 public class CamelOperatorTest extends RxTestSupport {
@@ -37,30 +40,64 @@ public class CamelOperatorTest extends RxTestSupport {
         final MockEndpoint mockEndpoint1 = camelContext.getEndpoint("mock:results1", MockEndpoint.class);
         final MockEndpoint mockEndpoint2 = camelContext.getEndpoint("mock:results2", MockEndpoint.class);
         final MockEndpoint mockEndpoint3 = camelContext.getEndpoint("mock:results3", MockEndpoint.class);
+        final MockEndpoint mockEndpoint4 = camelContext.getEndpoint("mock:results4", MockEndpoint.class);
         mockEndpoint1.expectedMessageCount(2);
         mockEndpoint2.expectedMessageCount(1);
         mockEndpoint3.expectedMessageCount(1);
+        mockEndpoint4.expectedMessageCount(2);
 
-        ConnectableObservable<Message> route = reactiveCamel.toObservable("direct:start")
+        // Define an InOnly route
+        ConnectableObservable<Exchange> inOnly = reactiveCamel.from("direct:start")
             .lift(new CamelOperator(mockEndpoint1))
-            .lift(new CamelOperator(camelContext, "log:foo"))
+            .lift(new CamelOperator(camelContext, "log:inOnly"))
             .debounce(1, TimeUnit.SECONDS)
             .lift(reactiveCamel.to(mockEndpoint2))
             .lift(reactiveCamel.to("mock:results3"))
             .publish();
 
         // Start the route
-        Subscription routeSubscription = route.connect();
+        Subscription inSubscription = inOnly.connect();
 
         // Send two test messages
-        producerTemplate.sendBody("direct:start", "<test/>");
-        producerTemplate.sendBody("direct:start", "<test/>");
+        producerTemplate.sendBody("direct:start", "<test1/>");
+        producerTemplate.sendBody("direct:start", "<test2/>");
+
+        // Define an InOut route
+        ConnectableObservable<Exchange> inOut = reactiveCamel.from("restlet:http://localhost:9080/test?restletMethod=POST")
+            .map(new Func1<Exchange, Exchange>() { // Convert body to String
+                @Override
+                public Exchange call(Exchange exchange) {
+                    exchange.getIn().setBody(exchange.getIn().getBody(String.class));
+                    return exchange;
+                }
+            })
+            .lift(reactiveCamel.to("log:inOut"))
+            .map(new Func1<Exchange, Exchange>() { // Change body for response
+                @Override
+                public Exchange call(Exchange exchange) {
+                    exchange.getIn().setBody(exchange.getIn().getBody(String.class) + " back");
+                    return exchange;
+                }
+            })
+            .lift(reactiveCamel.to(mockEndpoint4))
+            .publish();
+
+        // Start the route
+        Subscription inoutSubscription = inOut.connect();
+
+        // Send two messages and check the responses
+        given().body("hello").when().post("http://localhost:9080/test").then().assertThat().body(containsString("hello back"));
+        given().body("holla").when().post("http://localhost:9080/test").then().assertThat().body(containsString("holla back"));
 
         mockEndpoint1.assertIsSatisfied();
         mockEndpoint2.assertIsSatisfied();
         mockEndpoint3.assertIsSatisfied();
+        mockEndpoint4.assertIsSatisfied();
+
+        // Stop the route
+        inSubscription.unsubscribe();
 
         // Stop the route
-        routeSubscription.unsubscribe();
+        inoutSubscription.unsubscribe();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/resources/log4j.properties b/components/camel-rx/src/test/resources/log4j.properties
index 747baca..4fbe6ae 100644
--- a/components/camel-rx/src/test/resources/log4j.properties
+++ b/components/camel-rx/src/test/resources/log4j.properties
@@ -32,5 +32,5 @@ log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.file.file=target/camel-tx-test.log
+log4j.appender.file.file=target/camel-rx-test.log
 log4j.appender.file.append=true


[2/2] camel git commit: CAMEL-7833 clean up the pom file of camel-rx

Posted by ni...@apache.org.
CAMEL-7833 clean up the pom file of camel-rx


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9020df3b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9020df3b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9020df3b

Branch: refs/heads/master
Commit: 9020df3be5a0e66e95ca82441b91b9575581c54a
Parents: 33ebe71
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Apr 6 15:37:43 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Apr 6 15:38:56 2015 +0800

----------------------------------------------------------------------
 components/camel-rx/pom.xml                              | 11 ++++++-----
 .../test/java/org/apache/camel/rx/CamelOperatorTest.java |  4 ++--
 2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9020df3b/components/camel-rx/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rx/pom.xml b/components/camel-rx/pom.xml
index 63baf5f..29f42dd 100644
--- a/components/camel-rx/pom.xml
+++ b/components/camel-rx/pom.xml
@@ -39,11 +39,7 @@
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-core</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-restlet</artifactId>
-    </dependency>
-
+    
     <dependency>
       <groupId>io.reactivex</groupId>
       <artifactId>rxjava</artifactId>
@@ -53,6 +49,10 @@
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-restlet</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-test</artifactId>
       <scope>test</scope>
     </dependency>
@@ -70,6 +70,7 @@
       <groupId>com.jayway.restassured</groupId>
       <artifactId>rest-assured</artifactId>
       <version>2.3.0</version>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9020df3b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
index 8d09858..f9e30d9 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -27,8 +27,8 @@ import rx.Subscription;
 import rx.functions.Func1;
 import rx.observables.ConnectableObservable;
 
-import static com.jayway.restassured.RestAssured.*;
-import static org.hamcrest.Matchers.*;
+import static com.jayway.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.containsString;
 
 /**
  */