You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/06 10:19:14 UTC

camel git commit: CAMEL-8747: camel-rx - Should leverage UoW when subscribe or observe

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x 6a45c7a3b -> 756f4a471


CAMEL-8747: camel-rx - Should leverage UoW when subscribe or observe


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/756f4a47
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/756f4a47
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/756f4a47

Branch: refs/heads/camel-2.15.x
Commit: 756f4a4713b0abd2f5d36852bed8afbc1410e5a6
Parents: 6a45c7a
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 6 10:22:12 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 6 10:22:39 2015 +0200

----------------------------------------------------------------------
 .../camel/rx/support/EndpointSubscription.java  |  6 +-
 .../apache/camel/rx/support/ObserverSender.java | 13 ++--
 .../java/org/apache/camel/rx/SendToTest.java    |  4 +-
 .../java/org/apache/camel/rx/SendToUoWTest.java | 67 ++++++++++++++++++++
 .../apache/camel/rx/ToObservableUoWTest.java    | 59 +++++++++++++++++
 5 files changed, 140 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 8ecd265..593e1d4 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -22,6 +22,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,8 +49,11 @@ public class EndpointSubscription<T> implements Subscription {
 
         // lets create the consumer
         Processor processor = new ProcessorToObserver<T>(func, observer);
+        // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
+        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
+        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
         try {
-            this.consumer = endpoint.createConsumer(processor);
+            this.consumer = endpoint.createConsumer(internal);
             ServiceHelper.startService(consumer);
         } catch (Exception e) {
             observer.onError(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
index 35f1048..0cf5d90 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
@@ -19,19 +19,22 @@ package org.apache.camel.rx.support;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
-import org.apache.camel.rx.RuntimeCamelRxException;
-
+import org.apache.camel.processor.UnitOfWorkProducer;
 import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import rx.Observer;
 
 /**
  * An {@link Observer} which sends events to a given {@link Endpoint}
  */
 public class ObserverSender<T> implements Observer<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(ObserverSender.class);
+
     private Producer producer;
 
     public ObserverSender(Endpoint endpoint) throws Exception {
-        this.producer = endpoint.createProducer();
+        this.producer = new UnitOfWorkProducer(endpoint.createProducer());
         ServiceHelper.startService(producer);
     }
 
@@ -41,7 +44,7 @@ public class ObserverSender<T> implements Observer<T> {
             try {
                 ServiceHelper.stopService(producer);
             } catch (Exception e) {
-                throw new RuntimeCamelRxException(e);
+                LOG.warn("Error stopping producer: " + producer + " due " + e.getMessage() + ". This exception is ignored.", e);
             } finally {
                 producer = null;
             }
@@ -66,7 +69,7 @@ public class ObserverSender<T> implements Observer<T> {
         try {
             producer.process(exchange);
         } catch (Exception e) {
-            throw new RuntimeCamelRxException(e);
+            exchange.setException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
index 634c7a9..d905f5a 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
@@ -21,9 +21,8 @@ import org.junit.Test;
 
 import rx.Observable;
 
-/**
- */
 public class SendToTest extends RxTestSupport {
+
     @Test
     public void testSendObservableToEndpoint() throws Exception {
         Order[] expectedBodies = {new Order("o1", 1.10), new Order("o2", 2.20), new Order("o3", 3.30)};
@@ -36,6 +35,5 @@ public class SendToTest extends RxTestSupport {
         reactiveCamel.sendTo(someObservable, "mock:results");
 
         mockEndpoint.assertIsSatisfied();
-
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java
new file mode 100644
index 0000000..0852c3d
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.junit.Assert;
+import org.junit.Test;
+import rx.Observable;
+
+public class SendToUoWTest extends RxTestSupport {
+
+    private MyOnCompletion onCompletion = new MyOnCompletion();
+
+    @Test
+    public void testSendObservableToEndpoint() throws Exception {
+        Order[] expectedBodies = {new Order("o1", 1.10), new Order("o2", 2.20), new Order("o3", 3.30)};
+        Observable<Order> someObservable = Observable.from(expectedBodies);
+
+        final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class);
+        mockEndpoint.expectedBodiesReceived((Object[]) expectedBodies);
+
+        mockEndpoint.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.addOnCompletion(onCompletion);
+            }
+        });
+
+        // lets send events on the observable to the camel endpoint
+        reactiveCamel.sendTo(someObservable, "mock:results");
+
+        mockEndpoint.assertIsSatisfied();
+
+        Assert.assertEquals(3, onCompletion.getDone());
+    }
+
+    private static class MyOnCompletion extends SynchronizationAdapter {
+
+        private int done;
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            done++;
+        }
+
+        public int getDone() {
+            return done;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java
new file mode 100644
index 0000000..88a5b2f
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.FileUtil;
+import org.junit.Test;
+import rx.Observable;
+import rx.functions.Action1;
+
+public class ToObservableUoWTest extends RxTestSupport {
+
+    @Override
+    public void init() throws Exception {
+        FileUtil.removeDir(new File("target/foo"));
+        super.init();
+    }
+
+    @Test
+    public void testConsumeUoW() throws Exception {
+        final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class);
+        mockEndpoint.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+
+        Observable<Message> observable = reactiveCamel.toObservable("file://target/foo?move=done");
+        observable.subscribe(new Action1<Message>() {
+            @Override
+            public void call(Message message) {
+                String body = message.getBody(String.class);
+                producerTemplate.sendBody("mock:results", body);
+            }
+        });
+
+        producerTemplate.sendBodyAndHeader("file://target/foo", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        producerTemplate.sendBodyAndHeader("file://target/foo", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+        mockEndpoint.expectedFileExists("target/foo/done/hello.txt");
+        mockEndpoint.expectedFileExists("target/foo/done/bye.txt");
+
+        mockEndpoint.assertIsSatisfied();
+    }
+}