You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/06 10:19:14 UTC
camel git commit: CAMEL-8747: camel-rx - Should leverage UoW when
subscribe or observe
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x 6a45c7a3b -> 756f4a471
CAMEL-8747: camel-rx - Should leverage UoW when subscribe or observe
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/756f4a47
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/756f4a47
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/756f4a47
Branch: refs/heads/camel-2.15.x
Commit: 756f4a4713b0abd2f5d36852bed8afbc1410e5a6
Parents: 6a45c7a
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 6 10:22:12 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 6 10:22:39 2015 +0200
----------------------------------------------------------------------
.../camel/rx/support/EndpointSubscription.java | 6 +-
.../apache/camel/rx/support/ObserverSender.java | 13 ++--
.../java/org/apache/camel/rx/SendToTest.java | 4 +-
.../java/org/apache/camel/rx/SendToUoWTest.java | 67 ++++++++++++++++++++
.../apache/camel/rx/ToObservableUoWTest.java | 59 +++++++++++++++++
5 files changed, 140 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 8ecd265..593e1d4 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -22,6 +22,7 @@ import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +49,11 @@ public class EndpointSubscription<T> implements Subscription {
// lets create the consumer
Processor processor = new ProcessorToObserver<T>(func, observer);
+ // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
+ CamelInternalProcessor internal = new CamelInternalProcessor(processor);
+ internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
try {
- this.consumer = endpoint.createConsumer(processor);
+ this.consumer = endpoint.createConsumer(internal);
ServiceHelper.startService(consumer);
} catch (Exception e) {
observer.onError(e);
http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
index 35f1048..0cf5d90 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
@@ -19,19 +19,22 @@ package org.apache.camel.rx.support;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
-import org.apache.camel.rx.RuntimeCamelRxException;
-
+import org.apache.camel.processor.UnitOfWorkProducer;
import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.Observer;
/**
* An {@link Observer} which sends events to a given {@link Endpoint}
*/
public class ObserverSender<T> implements Observer<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(ObserverSender.class);
+
private Producer producer;
public ObserverSender(Endpoint endpoint) throws Exception {
- this.producer = endpoint.createProducer();
+ this.producer = new UnitOfWorkProducer(endpoint.createProducer());
ServiceHelper.startService(producer);
}
@@ -41,7 +44,7 @@ public class ObserverSender<T> implements Observer<T> {
try {
ServiceHelper.stopService(producer);
} catch (Exception e) {
- throw new RuntimeCamelRxException(e);
+ LOG.warn("Error stopping producer: " + producer + " due " + e.getMessage() + ". This exception is ignored.", e);
} finally {
producer = null;
}
@@ -66,7 +69,7 @@ public class ObserverSender<T> implements Observer<T> {
try {
producer.process(exchange);
} catch (Exception e) {
- throw new RuntimeCamelRxException(e);
+ exchange.setException(e);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
index 634c7a9..d905f5a 100644
--- a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
@@ -21,9 +21,8 @@ import org.junit.Test;
import rx.Observable;
-/**
- */
public class SendToTest extends RxTestSupport {
+
@Test
public void testSendObservableToEndpoint() throws Exception {
Order[] expectedBodies = {new Order("o1", 1.10), new Order("o2", 2.20), new Order("o3", 3.30)};
@@ -36,6 +35,5 @@ public class SendToTest extends RxTestSupport {
reactiveCamel.sendTo(someObservable, "mock:results");
mockEndpoint.assertIsSatisfied();
-
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java
new file mode 100644
index 0000000..0852c3d
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.junit.Assert;
+import org.junit.Test;
+import rx.Observable;
+
+public class SendToUoWTest extends RxTestSupport {
+
+ private MyOnCompletion onCompletion = new MyOnCompletion();
+
+ @Test
+ public void testSendObservableToEndpoint() throws Exception {
+ Order[] expectedBodies = {new Order("o1", 1.10), new Order("o2", 2.20), new Order("o3", 3.30)};
+ Observable<Order> someObservable = Observable.from(expectedBodies);
+
+ final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class);
+ mockEndpoint.expectedBodiesReceived((Object[]) expectedBodies);
+
+ mockEndpoint.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.addOnCompletion(onCompletion);
+ }
+ });
+
+ // lets send events on the observable to the camel endpoint
+ reactiveCamel.sendTo(someObservable, "mock:results");
+
+ mockEndpoint.assertIsSatisfied();
+
+ Assert.assertEquals(3, onCompletion.getDone());
+ }
+
+ private static class MyOnCompletion extends SynchronizationAdapter {
+
+ private int done;
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ done++;
+ }
+
+ public int getDone() {
+ return done;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java
new file mode 100644
index 0000000..88a5b2f
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.FileUtil;
+import org.junit.Test;
+import rx.Observable;
+import rx.functions.Action1;
+
+public class ToObservableUoWTest extends RxTestSupport {
+
+ @Override
+ public void init() throws Exception {
+ FileUtil.removeDir(new File("target/foo"));
+ super.init();
+ }
+
+ @Test
+ public void testConsumeUoW() throws Exception {
+ final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class);
+ mockEndpoint.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+
+ Observable<Message> observable = reactiveCamel.toObservable("file://target/foo?move=done");
+ observable.subscribe(new Action1<Message>() {
+ @Override
+ public void call(Message message) {
+ String body = message.getBody(String.class);
+ producerTemplate.sendBody("mock:results", body);
+ }
+ });
+
+ producerTemplate.sendBodyAndHeader("file://target/foo", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ producerTemplate.sendBodyAndHeader("file://target/foo", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+ mockEndpoint.expectedFileExists("target/foo/done/hello.txt");
+ mockEndpoint.expectedFileExists("target/foo/done/bye.txt");
+
+ mockEndpoint.assertIsSatisfied();
+ }
+}