You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/06 09:39:09 UTC
[1/2] camel git commit: CAMEL-7833 InOnly and InOut routes as
Observable sequences
Repository: camel
Updated Branches:
refs/heads/master 9b8397f08 -> 9020df3be
CAMEL-7833 InOnly and InOut routes as Observable<Exchange> sequences
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33ebe714
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33ebe714
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33ebe714
Branch: refs/heads/master
Commit: 33ebe714a9b304632eaeed2bdd0859d583b41563
Parents: 9b8397f
Author: Jyrki Ruuskanen <yu...@kotikone.fi>
Authored: Sun Apr 5 12:59:12 2015 +0300
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Apr 6 15:38:47 2015 +0800
----------------------------------------------------------------------
components/camel-rx/pom.xml | 9 ++++
.../java/org/apache/camel/rx/CamelOperator.java | 24 +++------
.../java/org/apache/camel/rx/ReactiveCamel.java | 28 ++++++++++-
.../org/apache/camel/rx/CamelOperatorTest.java | 53 +++++++++++++++++---
.../src/test/resources/log4j.properties | 2 +-
5 files changed, 87 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rx/pom.xml b/components/camel-rx/pom.xml
index cb4827f..63baf5f 100644
--- a/components/camel-rx/pom.xml
+++ b/components/camel-rx/pom.xml
@@ -39,6 +39,10 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-restlet</artifactId>
+ </dependency>
<dependency>
<groupId>io.reactivex</groupId>
@@ -62,6 +66,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.jayway.restassured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <version>2.3.0</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index 2a6fa3a..917f069 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -19,14 +19,13 @@ package org.apache.camel.rx;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.processor.PipelineHelper;
import org.apache.camel.util.ServiceHelper;
import rx.Observable;
import rx.Subscriber;
-public class CamelOperator implements Observable.Operator<Message, Message> {
+public class CamelOperator implements Observable.Operator<Exchange, Exchange> {
private ProducerTemplate producerTemplate;
private Endpoint endpoint;
@@ -44,8 +43,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
}
@Override
- public Subscriber<? super Message> call(final Subscriber<? super Message> s) {
- return new Subscriber<Message>(s) {
+ public Subscriber<? super Exchange> call(final Subscriber<? super Exchange> s) {
+ return new Subscriber<Exchange>(s) {
@Override
public void onCompleted() {
try {
@@ -70,19 +69,14 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
}
@Override
- public void onNext(Message item) {
+ public void onNext(Exchange item) {
if (!s.isUnsubscribed()) {
Exchange exchange = process(item);
if (exchange.getException() != null) {
s.onError(exchange.getException());
} else {
- if (exchange.hasOut()) {
- s.onNext(exchange.getOut());
- } else {
- s.onNext(exchange.getIn());
- }
+ s.onNext(PipelineHelper.createNextExchange(exchange));
}
-
}
}
};
@@ -96,10 +90,4 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
}
return exchange;
}
-
- private Exchange process(Message message) {
- Exchange exchange = endpoint.createExchange();
- exchange.setIn(message);
- return process(exchange);
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index 678c4e8..e0eb869 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -23,7 +23,6 @@ import org.apache.camel.Message;
import org.apache.camel.rx.support.EndpointObservable;
import org.apache.camel.rx.support.EndpointSubscribeFunc;
import org.apache.camel.rx.support.ExchangeToBodyFunc1;
-import org.apache.camel.rx.support.ExchangeToMessageFunc1;
import org.apache.camel.rx.support.ObserverSender;
import org.apache.camel.util.CamelContextHelper;
import rx.Observable;
@@ -62,7 +61,7 @@ public class ReactiveCamel {
* to be processed using <a href="https://rx.codeplex.com/">Reactive Extensions</a>
*/
public Observable<Message> toObservable(Endpoint endpoint) {
- return createEndpointObservable(endpoint, ExchangeToMessageFunc1.getInstance());
+ return toObservable(endpoint, Message.class);
}
/**
@@ -93,6 +92,20 @@ public class ReactiveCamel {
}
/**
+ * Convenience method for beginning the route
+ */
+ public Observable<Exchange> from(Endpoint endpoint) {
+ return createEndpointObservable(endpoint);
+ }
+
+ /**
+ * Convenience method for beginning the route
+ */
+ public Observable<Exchange> from(String uri) {
+ return from(endpoint(uri));
+ }
+
+ /**
* Convenience method for creating CamelOperator instances
*/
public CamelOperator to(String uri) throws Exception {
@@ -124,4 +137,15 @@ public class ReactiveCamel {
return new EndpointObservable<T>(endpoint, func);
}
+ /**
+ * Return a newly created {@link Observable} without conversion
+ */
+ protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
+ return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<>(endpoint, new Func1<Exchange, Exchange>() {
+ @Override
+ public Exchange call(Exchange exchange) {
+ return exchange;
+ }
+ }));
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
index f0bacc3..8d09858 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -18,15 +18,18 @@ package org.apache.camel.rx;
import java.util.concurrent.TimeUnit;
-import org.apache.camel.Message;
+import org.apache.camel.Exchange;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Observable;
import rx.Subscription;
+import rx.functions.Func1;
import rx.observables.ConnectableObservable;
+import static com.jayway.restassured.RestAssured.*;
+import static org.hamcrest.Matchers.*;
+
/**
*/
public class CamelOperatorTest extends RxTestSupport {
@@ -37,30 +40,64 @@ public class CamelOperatorTest extends RxTestSupport {
final MockEndpoint mockEndpoint1 = camelContext.getEndpoint("mock:results1", MockEndpoint.class);
final MockEndpoint mockEndpoint2 = camelContext.getEndpoint("mock:results2", MockEndpoint.class);
final MockEndpoint mockEndpoint3 = camelContext.getEndpoint("mock:results3", MockEndpoint.class);
+ final MockEndpoint mockEndpoint4 = camelContext.getEndpoint("mock:results4", MockEndpoint.class);
mockEndpoint1.expectedMessageCount(2);
mockEndpoint2.expectedMessageCount(1);
mockEndpoint3.expectedMessageCount(1);
+ mockEndpoint4.expectedMessageCount(2);
- ConnectableObservable<Message> route = reactiveCamel.toObservable("direct:start")
+ // Define an InOnly route
+ ConnectableObservable<Exchange> inOnly = reactiveCamel.from("direct:start")
.lift(new CamelOperator(mockEndpoint1))
- .lift(new CamelOperator(camelContext, "log:foo"))
+ .lift(new CamelOperator(camelContext, "log:inOnly"))
.debounce(1, TimeUnit.SECONDS)
.lift(reactiveCamel.to(mockEndpoint2))
.lift(reactiveCamel.to("mock:results3"))
.publish();
// Start the route
- Subscription routeSubscription = route.connect();
+ Subscription inSubscription = inOnly.connect();
// Send two test messages
- producerTemplate.sendBody("direct:start", "<test/>");
- producerTemplate.sendBody("direct:start", "<test/>");
+ producerTemplate.sendBody("direct:start", "<test1/>");
+ producerTemplate.sendBody("direct:start", "<test2/>");
+
+ // Define an InOut route
+ ConnectableObservable<Exchange> inOut = reactiveCamel.from("restlet:http://localhost:9080/test?restletMethod=POST")
+ .map(new Func1<Exchange, Exchange>() { // Convert body to String
+ @Override
+ public Exchange call(Exchange exchange) {
+ exchange.getIn().setBody(exchange.getIn().getBody(String.class));
+ return exchange;
+ }
+ })
+ .lift(reactiveCamel.to("log:inOut"))
+ .map(new Func1<Exchange, Exchange>() { // Change body for response
+ @Override
+ public Exchange call(Exchange exchange) {
+ exchange.getIn().setBody(exchange.getIn().getBody(String.class) + " back");
+ return exchange;
+ }
+ })
+ .lift(reactiveCamel.to(mockEndpoint4))
+ .publish();
+
+ // Start the route
+ Subscription inoutSubscription = inOut.connect();
+
+ // Send two messages and check the responses
+ given().body("hello").when().post("http://localhost:9080/test").then().assertThat().body(containsString("hello back"));
+ given().body("holla").when().post("http://localhost:9080/test").then().assertThat().body(containsString("holla back"));
mockEndpoint1.assertIsSatisfied();
mockEndpoint2.assertIsSatisfied();
mockEndpoint3.assertIsSatisfied();
+ mockEndpoint4.assertIsSatisfied();
+
+ // Stop the route
+ inSubscription.unsubscribe();
// Stop the route
- routeSubscription.unsubscribe();
+ inoutSubscription.unsubscribe();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/resources/log4j.properties b/components/camel-rx/src/test/resources/log4j.properties
index 747baca..4fbe6ae 100644
--- a/components/camel-rx/src/test/resources/log4j.properties
+++ b/components/camel-rx/src/test/resources/log4j.properties
@@ -32,5 +32,5 @@ log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.file.file=target/camel-tx-test.log
+log4j.appender.file.file=target/camel-rx-test.log
log4j.appender.file.append=true
[2/2] camel git commit: CAMEL-7833 clean up the pom file of camel-rx
Posted by ni...@apache.org.
CAMEL-7833 clean up the pom file of camel-rx
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9020df3b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9020df3b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9020df3b
Branch: refs/heads/master
Commit: 9020df3be5a0e66e95ca82441b91b9575581c54a
Parents: 33ebe71
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Apr 6 15:37:43 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Apr 6 15:38:56 2015 +0800
----------------------------------------------------------------------
components/camel-rx/pom.xml | 11 ++++++-----
.../test/java/org/apache/camel/rx/CamelOperatorTest.java | 4 ++--
2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9020df3b/components/camel-rx/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rx/pom.xml b/components/camel-rx/pom.xml
index 63baf5f..29f42dd 100644
--- a/components/camel-rx/pom.xml
+++ b/components/camel-rx/pom.xml
@@ -39,11 +39,7 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-restlet</artifactId>
- </dependency>
-
+
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
@@ -53,6 +49,10 @@
<!-- test dependencies -->
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-restlet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<scope>test</scope>
</dependency>
@@ -70,6 +70,7 @@
<groupId>com.jayway.restassured</groupId>
<artifactId>rest-assured</artifactId>
<version>2.3.0</version>
+ <scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/camel/blob/9020df3b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
index 8d09858..f9e30d9 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -27,8 +27,8 @@ import rx.Subscription;
import rx.functions.Func1;
import rx.observables.ConnectableObservable;
-import static com.jayway.restassured.RestAssured.*;
-import static org.hamcrest.Matchers.*;
+import static com.jayway.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.containsString;
/**
*/