You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2013/10/28 22:18:20 UTC

git commit: Let's make use of a clean java generics by the camel-rx component codebase, consequently removed all the @SuppressWarnings occurrences and polished it's codebase as well.

Updated Branches:
  refs/heads/master 28d3be75e -> 5d06437e5


Let's make use of a clean java generics by the camel-rx component codebase, consequently removed  all the @SuppressWarnings occurrences and polished it's codebase as well.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d06437e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d06437e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d06437e

Branch: refs/heads/master
Commit: 5d06437e518fd1ab93ced8ae2ebf9bcbc5be0a0b
Parents: 28d3be7
Author: Babak Vahdat <bv...@apache.org>
Authored: Mon Oct 28 22:18:09 2013 +0100
Committer: Babak Vahdat <bv...@apache.org>
Committed: Mon Oct 28 22:18:09 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/impl/DefaultDebugger.java     |  2 +-
 .../main/java/org/apache/camel/rx/ObservableBody.java   |  3 ++-
 .../main/java/org/apache/camel/rx/ReactiveCamel.java    | 12 +++++-------
 .../apache/camel/rx/support/EndpointSubscription.java   |  6 +++---
 .../apache/camel/rx/support/ObservableProcessor.java    | 11 ++++++-----
 .../org/apache/camel/rx/support/ObserverSender.java     |  7 +++++--
 .../apache/camel/rx/support/ProcessorToObserver.java    |  4 ++--
 .../java/org/apache/camel/rx/ObservableBodyTest.java    |  4 ++++
 .../java/org/apache/camel/rx/ObservableMessageTest.java |  4 ++++
 .../src/test/java/org/apache/camel/rx/Order.java        |  5 +++--
 .../src/test/java/org/apache/camel/rx/SendToTest.java   |  2 +-
 .../org/apache/camel/rx/ToObservableAndMapTest.java     |  1 +
 .../java/org/apache/camel/rx/ToObservableBodyTest.java  |  2 ++
 13 files changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
index f487fb5..84e91dd 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java
@@ -122,7 +122,7 @@ public class DefaultDebugger implements Debugger, CamelContextAware {
     }
 
     public void addSingleStepBreakpoint(final Breakpoint breakpoint) {
-        addSingleStepBreakpoint(breakpoint, null);
+        addSingleStepBreakpoint(breakpoint);
     }
 
     public void addSingleStepBreakpoint(final Breakpoint breakpoint, Condition... conditions) {

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
index 0dc05af..737d255 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
@@ -28,10 +28,11 @@ public abstract class ObservableBody<T> extends ObservableProcessor<T> {
     private final Class<T> bodyType;
 
     public ObservableBody(Class<T> bodyType) {
-        super(new ExchangeToBodyFunc1(bodyType));
+        super(new ExchangeToBodyFunc1<T>(bodyType));
         this.bodyType = bodyType;
     }
 
+    @Override
     public String toString() {
         return "ObservableBody[" + bodyType.getName() + "]";
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index 769c39e..5180bb4 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -86,10 +86,9 @@ public class ReactiveCamel {
     /**
      * Sends events on the given {@link Observable} to the given camel endpoint
      */
-    @SuppressWarnings("unchecked")
     public <T> void sendTo(Observable<T> observable, Endpoint endpoint) {
         try {
-            ObserverSender observer = new ObserverSender(endpoint);
+            ObserverSender<T> observer = new ObserverSender<T>(endpoint);
             observable.subscribe(observer);
         } catch (Exception e) {
             throw new RuntimeCamelRxException(e);
@@ -108,16 +107,15 @@ public class ReactiveCamel {
      * Returns a newly created {@link Observable} given a function which converts
      * the {@link Exchange} from the Camel consumer to the required type
      */
-    @SuppressWarnings("unchecked")
     protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
                                                          final Func1<Exchange, T> converter) {
-        Observable.OnSubscribeFunc<Message> func = new Observable.OnSubscribeFunc<Message>() {
+        Observable.OnSubscribeFunc<T> func = new Observable.OnSubscribeFunc<T>() {
             @Override
-            public Subscription onSubscribe(Observer<? super Message> observer) {
-                return new EndpointSubscription(endpoint, observer, converter);
+            public Subscription onSubscribe(Observer<? super T> observer) {
+                return new EndpointSubscription<T>(endpoint, observer, converter);
             }
         };
-        return new EndpointObservable(endpoint, func);
+        return new EndpointObservable<T>(endpoint, func);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 1684838..349a898 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -31,10 +31,10 @@ import rx.util.functions.Func1;
  */
 public class EndpointSubscription<T> implements Subscription {
     private final Endpoint endpoint;
-    private final Observer<T> observer;
+    private final Observer<? super T> observer;
     private Consumer consumer;
 
-    public EndpointSubscription(Endpoint endpoint, final Observer<T> observer,
+    public EndpointSubscription(Endpoint endpoint, final Observer<? super T> observer,
                                 final Func1<Exchange, T> func) {
         this.endpoint = endpoint;
         this.observer = observer;
@@ -72,7 +72,7 @@ public class EndpointSubscription<T> implements Subscription {
         return endpoint;
     }
 
-    public Observer<T> getObserver() {
+    public Observer<? super T> getObserver() {
         return observer;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
index 056f591..cf03583 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
@@ -30,14 +30,14 @@ import rx.util.functions.Func1;
  * so that the messages can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a>
  */
 public abstract class ObservableProcessor<T> extends ServiceSupport implements Processor {
-    private final Subject observable = PublishSubject.create();
-    private final ProcessorToObserver processor;
+    private final Subject<T, T> observable = PublishSubject.create();
+    private final ProcessorToObserver<T> processor;
 
-    @SuppressWarnings("unchecked")
     protected ObservableProcessor(Func1<Exchange, T> func) {
-        this.processor = new ProcessorToObserver(func, observable);
+        this.processor = new ProcessorToObserver<T>(func, observable);
     }
 
+    @Override
     public void process(Exchange exchange) throws Exception {
         processor.process(exchange);
     }
@@ -46,7 +46,6 @@ public abstract class ObservableProcessor<T> extends ServiceSupport implements P
      * Returns the {@link Observable} for this {@link Processor} so that the messages that are received
      * can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a>
      */
-    @SuppressWarnings("unchecked")
     public Observable<T> getObservable() {
         return observable;
     }
@@ -57,10 +56,12 @@ public abstract class ObservableProcessor<T> extends ServiceSupport implements P
      */
     protected abstract void configure(Observable<T> observable);
 
+    @Override
     protected void doStart() throws Exception {
         configure(getObservable());
     }
 
+    @Override
     protected void doStop() throws Exception {
         // noop
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
index 435ddd8..35f1048 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
@@ -27,7 +27,7 @@ import rx.Observer;
 /**
  * An {@link Observer} which sends events to a given {@link Endpoint}
  */
-public class ObserverSender implements Observer {
+public class ObserverSender<T> implements Observer<T> {
     private Producer producer;
 
     public ObserverSender(Endpoint endpoint) throws Exception {
@@ -35,6 +35,7 @@ public class ObserverSender implements Observer {
         ServiceHelper.startService(producer);
     }
 
+    @Override
     public void onCompleted() {
         if (producer != null) {
             try {
@@ -47,13 +48,15 @@ public class ObserverSender implements Observer {
         }
     }
 
+    @Override
     public void onError(Throwable e) {
         Exchange exchange = producer.createExchange();
         exchange.setException(e);
         send(exchange);
     }
 
-    public void onNext(Object o) {
+    @Override
+    public void onNext(T o) {
         Exchange exchange = producer.createExchange();
         exchange.getIn().setBody(o);
         send(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
index 57d0c36..fff8668 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
@@ -29,9 +29,9 @@ import rx.util.functions.Func1;
  */
 public class ProcessorToObserver<T> implements Processor {
     private final Func1<Exchange, T> func;
-    private final Observer<T> observer;
+    private final Observer<? super T> observer;
 
-    public ProcessorToObserver(Func1<Exchange, T> func, Observer<T> observer) {
+    public ProcessorToObserver(Func1<Exchange, T> func, Observer<? super T> observer) {
         this.func = func;
         this.observer = observer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
index 3fca044..5b2cf41 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
@@ -52,13 +52,16 @@ public class ObservableBodyTest extends CamelTestSupport {
             super(String.class);
         }
 
+        @Override
         protected void configure(Observable<String> observable) {
             // lets process the messages using the RX API
             observable.map(new Func1<String, String>() {
+                @Override
                 public String call(String body) {
                     return "Hello " + body;
                 }
             }).subscribe(new Action1<String>() {
+                @Override
                 public void call(String body) {
                     template.sendBody(resultEndpoint, body);
                 }
@@ -69,6 +72,7 @@ public class ObservableBodyTest extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
+            @Override
             public void configure() {
                 from("direct:start").process(observableBody);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
index b9b3851..d952797 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
@@ -49,13 +49,16 @@ public class ObservableMessageTest extends CamelTestSupport {
     }
 
     public class MyObservableMessage extends ObservableMessage {
+        @Override
         protected void configure(Observable<Message> observable) {
             // lets process the messages using the RX API
             observable.map(new Func1<Message, String>() {
+                @Override
                 public String call(Message message) {
                     return "Hello " + message.getBody(String.class);
                 }
             }).subscribe(new Action1<String>() {
+                @Override
                 public void call(String body) {
                     template.sendBody(resultEndpoint, body);
                 }
@@ -66,6 +69,7 @@ public class ObservableMessageTest extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
+            @Override
             public void configure() {
                 from("direct:start").process(observableMessage);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java b/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
index 8f866ea..5d98e20 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
@@ -19,14 +19,15 @@ package org.apache.camel.rx;
 /**
  */
 public class Order {
-    private String id;
-    private double amount;
+    private final String id;
+    private final double amount;
 
     public Order(String id, double amount) {
         this.amount = amount;
         this.id = id;
     }
 
+    @Override
     public String toString() {
         return "Order[id " + id + ", amount " + amount + "]";
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
index 83de696..634c7a9 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
@@ -30,7 +30,7 @@ public class SendToTest extends RxTestSupport {
         Observable<Order> someObservable = Observable.from(expectedBodies);
 
         final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class);
-        mockEndpoint.expectedBodiesReceived(expectedBodies);
+        mockEndpoint.expectedBodiesReceived((Object[]) expectedBodies);
 
         // lets send events on the observable to the camel endpoint
         reactiveCamel.sendTo(someObservable, "mock:results");

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
index e173bcd..b159918 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
@@ -41,6 +41,7 @@ public class ToObservableAndMapTest extends RxTestSupport {
 
         // transform the stream
         Observable<String> observable = observableMessage.map(new Func1<Message, String>() {
+            @Override
             public String call(Message message) {
                 return "Transformed value: headers " + message.getHeaders();
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
index f2449c3..6974e8b 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
@@ -38,10 +38,12 @@ public class ToObservableBodyTest extends RxTestSupport {
         // lets consume, filter and map events
         Observable<Order> observable = reactiveCamel.toObservable("seda:orders", Order.class);
         Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() {
+            @Override
             public Boolean call(Order order) {
                 return order.getAmount() > 100.0;
             }
         }).map(new Func1<Order, String>() {
+            @Override
             public String call(Order order) {
                 return order.getId();
             }