You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2013/10/28 22:18:20 UTC
git commit: Let's make use of a clean java generics by the camel-rx
component codebase,
consequently removed all the @SuppressWarnings occurrences and polished it's
codebase as well.
Updated Branches:
refs/heads/master 28d3be75e -> 5d06437e5
Let's make use of a clean java generics by the camel-rx component codebase, consequently removed all the @SuppressWarnings occurrences and polished it's codebase as well.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d06437e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d06437e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d06437e
Branch: refs/heads/master
Commit: 5d06437e518fd1ab93ced8ae2ebf9bcbc5be0a0b
Parents: 28d3be7
Author: Babak Vahdat <bv...@apache.org>
Authored: Mon Oct 28 22:18:09 2013 +0100
Committer: Babak Vahdat <bv...@apache.org>
Committed: Mon Oct 28 22:18:09 2013 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/impl/DefaultDebugger.java | 2 +-
.../main/java/org/apache/camel/rx/ObservableBody.java | 3 ++-
.../main/java/org/apache/camel/rx/ReactiveCamel.java | 12 +++++-------
.../apache/camel/rx/support/EndpointSubscription.java | 6 +++---
.../apache/camel/rx/support/ObservableProcessor.java | 11 ++++++-----
.../org/apache/camel/rx/support/ObserverSender.java | 7 +++++--
.../apache/camel/rx/support/ProcessorToObserver.java | 4 ++--
.../java/org/apache/camel/rx/ObservableBodyTest.java | 4 ++++
.../java/org/apache/camel/rx/ObservableMessageTest.java | 4 ++++
.../src/test/java/org/apache/camel/rx/Order.java | 5 +++--
.../src/test/java/org/apache/camel/rx/SendToTest.java | 2 +-
.../org/apache/camel/rx/ToObservableAndMapTest.java | 1 +
.../java/org/apache/camel/rx/ToObservableBodyTest.java | 2 ++
13 files changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
index f487fb5..84e91dd 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
@@ -122,7 +122,7 @@ public class DefaultDebugger implements Debugger, CamelContextAware {
}
public void addSingleStepBreakpoint(final Breakpoint breakpoint) {
- addSingleStepBreakpoint(breakpoint, null);
+ addSingleStepBreakpoint(breakpoint);
}
public void addSingleStepBreakpoint(final Breakpoint breakpoint, Condition... conditions) {
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
index 0dc05af..737d255 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
@@ -28,10 +28,11 @@ public abstract class ObservableBody<T> extends ObservableProcessor<T> {
private final Class<T> bodyType;
public ObservableBody(Class<T> bodyType) {
- super(new ExchangeToBodyFunc1(bodyType));
+ super(new ExchangeToBodyFunc1<T>(bodyType));
this.bodyType = bodyType;
}
+ @Override
public String toString() {
return "ObservableBody[" + bodyType.getName() + "]";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index 769c39e..5180bb4 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -86,10 +86,9 @@ public class ReactiveCamel {
/**
* Sends events on the given {@link Observable} to the given camel endpoint
*/
- @SuppressWarnings("unchecked")
public <T> void sendTo(Observable<T> observable, Endpoint endpoint) {
try {
- ObserverSender observer = new ObserverSender(endpoint);
+ ObserverSender<T> observer = new ObserverSender<T>(endpoint);
observable.subscribe(observer);
} catch (Exception e) {
throw new RuntimeCamelRxException(e);
@@ -108,16 +107,15 @@ public class ReactiveCamel {
* Returns a newly created {@link Observable} given a function which converts
* the {@link Exchange} from the Camel consumer to the required type
*/
- @SuppressWarnings("unchecked")
protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
final Func1<Exchange, T> converter) {
- Observable.OnSubscribeFunc<Message> func = new Observable.OnSubscribeFunc<Message>() {
+ Observable.OnSubscribeFunc<T> func = new Observable.OnSubscribeFunc<T>() {
@Override
- public Subscription onSubscribe(Observer<? super Message> observer) {
- return new EndpointSubscription(endpoint, observer, converter);
+ public Subscription onSubscribe(Observer<? super T> observer) {
+ return new EndpointSubscription<T>(endpoint, observer, converter);
}
};
- return new EndpointObservable(endpoint, func);
+ return new EndpointObservable<T>(endpoint, func);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 1684838..349a898 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -31,10 +31,10 @@ import rx.util.functions.Func1;
*/
public class EndpointSubscription<T> implements Subscription {
private final Endpoint endpoint;
- private final Observer<T> observer;
+ private final Observer<? super T> observer;
private Consumer consumer;
- public EndpointSubscription(Endpoint endpoint, final Observer<T> observer,
+ public EndpointSubscription(Endpoint endpoint, final Observer<? super T> observer,
final Func1<Exchange, T> func) {
this.endpoint = endpoint;
this.observer = observer;
@@ -72,7 +72,7 @@ public class EndpointSubscription<T> implements Subscription {
return endpoint;
}
- public Observer<T> getObserver() {
+ public Observer<? super T> getObserver() {
return observer;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
index 056f591..cf03583 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
@@ -30,14 +30,14 @@ import rx.util.functions.Func1;
* so that the messages can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a>
*/
public abstract class ObservableProcessor<T> extends ServiceSupport implements Processor {
- private final Subject observable = PublishSubject.create();
- private final ProcessorToObserver processor;
+ private final Subject<T, T> observable = PublishSubject.create();
+ private final ProcessorToObserver<T> processor;
- @SuppressWarnings("unchecked")
protected ObservableProcessor(Func1<Exchange, T> func) {
- this.processor = new ProcessorToObserver(func, observable);
+ this.processor = new ProcessorToObserver<T>(func, observable);
}
+ @Override
public void process(Exchange exchange) throws Exception {
processor.process(exchange);
}
@@ -46,7 +46,6 @@ public abstract class ObservableProcessor<T> extends ServiceSupport implements P
* Returns the {@link Observable} for this {@link Processor} so that the messages that are received
* can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a>
*/
- @SuppressWarnings("unchecked")
public Observable<T> getObservable() {
return observable;
}
@@ -57,10 +56,12 @@ public abstract class ObservableProcessor<T> extends ServiceSupport implements P
*/
protected abstract void configure(Observable<T> observable);
+ @Override
protected void doStart() throws Exception {
configure(getObservable());
}
+ @Override
protected void doStop() throws Exception {
// noop
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
index 435ddd8..35f1048 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
@@ -27,7 +27,7 @@ import rx.Observer;
/**
* An {@link Observer} which sends events to a given {@link Endpoint}
*/
-public class ObserverSender implements Observer {
+public class ObserverSender<T> implements Observer<T> {
private Producer producer;
public ObserverSender(Endpoint endpoint) throws Exception {
@@ -35,6 +35,7 @@ public class ObserverSender implements Observer {
ServiceHelper.startService(producer);
}
+ @Override
public void onCompleted() {
if (producer != null) {
try {
@@ -47,13 +48,15 @@ public class ObserverSender implements Observer {
}
}
+ @Override
public void onError(Throwable e) {
Exchange exchange = producer.createExchange();
exchange.setException(e);
send(exchange);
}
- public void onNext(Object o) {
+ @Override
+ public void onNext(T o) {
Exchange exchange = producer.createExchange();
exchange.getIn().setBody(o);
send(exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
index 57d0c36..fff8668 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
@@ -29,9 +29,9 @@ import rx.util.functions.Func1;
*/
public class ProcessorToObserver<T> implements Processor {
private final Func1<Exchange, T> func;
- private final Observer<T> observer;
+ private final Observer<? super T> observer;
- public ProcessorToObserver(Func1<Exchange, T> func, Observer<T> observer) {
+ public ProcessorToObserver(Func1<Exchange, T> func, Observer<? super T> observer) {
this.func = func;
this.observer = observer;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
index 3fca044..5b2cf41 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
@@ -52,13 +52,16 @@ public class ObservableBodyTest extends CamelTestSupport {
super(String.class);
}
+ @Override
protected void configure(Observable<String> observable) {
// lets process the messages using the RX API
observable.map(new Func1<String, String>() {
+ @Override
public String call(String body) {
return "Hello " + body;
}
}).subscribe(new Action1<String>() {
+ @Override
public void call(String body) {
template.sendBody(resultEndpoint, body);
}
@@ -69,6 +72,7 @@ public class ObservableBodyTest extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
+ @Override
public void configure() {
from("direct:start").process(observableBody);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
index b9b3851..d952797 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
@@ -49,13 +49,16 @@ public class ObservableMessageTest extends CamelTestSupport {
}
public class MyObservableMessage extends ObservableMessage {
+ @Override
protected void configure(Observable<Message> observable) {
// lets process the messages using the RX API
observable.map(new Func1<Message, String>() {
+ @Override
public String call(Message message) {
return "Hello " + message.getBody(String.class);
}
}).subscribe(new Action1<String>() {
+ @Override
public void call(String body) {
template.sendBody(resultEndpoint, body);
}
@@ -66,6 +69,7 @@ public class ObservableMessageTest extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
+ @Override
public void configure() {
from("direct:start").process(observableMessage);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java b/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
index 8f866ea..5d98e20 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
@@ -19,14 +19,15 @@ package org.apache.camel.rx;
/**
*/
public class Order {
- private String id;
- private double amount;
+ private final String id;
+ private final double amount;
public Order(String id, double amount) {
this.amount = amount;
this.id = id;
}
+ @Override
public String toString() {
return "Order[id " + id + ", amount " + amount + "]";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
index 83de696..634c7a9 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
@@ -30,7 +30,7 @@ public class SendToTest extends RxTestSupport {
Observable<Order> someObservable = Observable.from(expectedBodies);
final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class);
- mockEndpoint.expectedBodiesReceived(expectedBodies);
+ mockEndpoint.expectedBodiesReceived((Object[]) expectedBodies);
// lets send events on the observable to the camel endpoint
reactiveCamel.sendTo(someObservable, "mock:results");
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
index e173bcd..b159918 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
@@ -41,6 +41,7 @@ public class ToObservableAndMapTest extends RxTestSupport {
// transform the stream
Observable<String> observable = observableMessage.map(new Func1<Message, String>() {
+ @Override
public String call(Message message) {
return "Transformed value: headers " + message.getHeaders();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
index f2449c3..6974e8b 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
@@ -38,10 +38,12 @@ public class ToObservableBodyTest extends RxTestSupport {
// lets consume, filter and map events
Observable<Order> observable = reactiveCamel.toObservable("seda:orders", Order.class);
Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() {
+ @Override
public Boolean call(Order order) {
return order.getAmount() > 100.0;
}
}).map(new Func1<Order, String>() {
+ @Override
public String call(Order order) {
return order.getId();
}