You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/02 09:48:42 UTC
[1/4] camel git commit: CAMEL-7833 Added CamelOperator for lift()
Repository: camel
Updated Branches:
refs/heads/master 33e7087fb -> d85f2f0c5
CAMEL-7833 Added CamelOperator for lift()
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/081e8a73
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/081e8a73
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/081e8a73
Branch: refs/heads/master
Commit: 081e8a73195ce5776b3581b31ed15cbcadc68624
Parents: 33e7087
Author: Jyrki Ruuskanen <yu...@kotikone.fi>
Authored: Wed Apr 1 17:54:26 2015 +0300
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:03 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/camel/rx/CamelOperator.java | 79 ++++++++++++++++++++
1 file changed, 79 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/081e8a73/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
new file mode 100644
index 0000000..c218776
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -0,0 +1,79 @@
+package org.apache.camel.rx;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+import rx.Observable;
+import rx.Subscriber;
+
+public class CamelOperator implements Observable.Operator<Message, Message> {
+
+ private Producer producer;
+
+ public CamelOperator(CamelContext context, String uri) throws Exception {
+ this(context.getEndpoint(uri));
+ }
+
+ public CamelOperator(Endpoint endpoint) throws Exception {
+ this.producer = endpoint.createProducer();
+ ServiceHelper.startService(producer);
+ }
+
+ @Override
+ public Subscriber<? super Message> call(final Subscriber<? super Message> s) {
+ return new Subscriber<Message>(s) {
+ @Override
+ public void onCompleted() {
+ try {
+ ServiceHelper.stopService(producer);
+ } catch (Exception e) {
+ throw new RuntimeCamelRxException(e);
+ } finally {
+ producer = null;
+ }
+ if (!s.isUnsubscribed()) {
+ s.onCompleted();
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ Exchange exchange = producer.createExchange();
+ exchange.setException(e);
+ process(exchange);
+ if (!s.isUnsubscribed()) {
+ s.onError(e);
+ }
+ }
+
+ @Override
+ public void onNext(Message item) {
+ if (!s.isUnsubscribed()) {
+ s.onNext(process(item));
+ }
+ }
+ };
+ }
+
+ private Exchange process(Exchange exchange) {
+ try {
+ producer.process(exchange);
+ if (exchange.hasOut()) {
+ exchange.setIn(exchange.getOut());
+ exchange.setOut(null);
+ }
+ } catch (Exception e) {
+ throw new RuntimeCamelRxException(e);
+ }
+ return exchange;
+ }
+
+ private Message process(Message message) {
+ Exchange exchange = producer.createExchange();
+ exchange.setIn(message);
+ return process(exchange).getIn();
+ }
+}
[2/4] camel git commit: CAMEL-7833 Added CamelOperatorTest
Posted by ni...@apache.org.
CAMEL-7833 Added CamelOperatorTest
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d9b777a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d9b777a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d9b777a
Branch: refs/heads/master
Commit: 2d9b777a02cbe8359b738fadadf28da3eb4b96f6
Parents: 081e8a7
Author: Jyrki Ruuskanen <yu...@kotikone.fi>
Authored: Wed Apr 1 18:34:47 2015 +0300
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/camel/rx/CamelOperator.java | 16 ++++++
.../org/apache/camel/rx/CamelOperatorTest.java | 57 ++++++++++++++++++++
2 files changed, 73 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2d9b777a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index c218776..f965388 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.rx;
import org.apache.camel.CamelContext;
http://git-wip-us.apache.org/repos/asf/camel/blob/2d9b777a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
new file mode 100644
index 0000000..8667e3b
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Message;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+
+/**
+ */
+public class CamelOperatorTest extends RxTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelOperatorTest.class);
+
+ @Test
+ public void testCamelOperator() throws Exception {
+ final MockEndpoint mockEndpoint1 = camelContext.getEndpoint("mock:results1", MockEndpoint.class);
+ final MockEndpoint mockEndpoint2 = camelContext.getEndpoint("mock:results2", MockEndpoint.class);
+ final MockEndpoint mockEndpoint3 = camelContext.getEndpoint("mock:results3", MockEndpoint.class);
+ mockEndpoint1.expectedMessageCount(2);
+ mockEndpoint2.expectedMessageCount(1);
+ mockEndpoint3.expectedMessageCount(1);
+
+ Observable<Message> result = reactiveCamel.toObservable("direct:start")
+ .lift(new CamelOperator(camelContext, "mock:results1"))
+ .lift(new CamelOperator(camelContext, "log:foo"))
+ .debounce(1, TimeUnit.SECONDS)
+ .lift(new CamelOperator(mockEndpoint2));
+ reactiveCamel.sendTo(result, "mock:results3");
+
+ // Send two test messages
+ producerTemplate.sendBody("direct:start", "<test/>");
+ producerTemplate.sendBody("direct:start", "<test/>");
+
+ mockEndpoint1.assertIsSatisfied();
+ mockEndpoint2.assertIsSatisfied();
+ mockEndpoint3.assertIsSatisfied();
+ }
+}
[3/4] camel git commit: CAMEL-7833 Use ProducerTemplate to send
message and pass the exception to the subscriber
Posted by ni...@apache.org.
CAMEL-7833 Use ProducerTemplate to send message and pass the exception to the subscriber
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d85f2f0c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d85f2f0c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d85f2f0c
Branch: refs/heads/master
Commit: d85f2f0c57021066ec0d674b9c6b4b8be2ac3344
Parents: 0d9928a
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 2 15:47:48 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/camel/rx/CamelOperator.java | 48 ++++++++++++--------
1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d85f2f0c/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index f965388..2a6fa3a 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -21,21 +21,26 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Producer;
+import org.apache.camel.ProducerTemplate;
import org.apache.camel.util.ServiceHelper;
import rx.Observable;
import rx.Subscriber;
public class CamelOperator implements Observable.Operator<Message, Message> {
- private Producer producer;
+ private ProducerTemplate producerTemplate;
+ private Endpoint endpoint;
public CamelOperator(CamelContext context, String uri) throws Exception {
- this(context.getEndpoint(uri));
+ producerTemplate = context.createProducerTemplate();
+ endpoint = context.getEndpoint(uri);
+ ServiceHelper.startService(producerTemplate);
}
public CamelOperator(Endpoint endpoint) throws Exception {
- this.producer = endpoint.createProducer();
- ServiceHelper.startService(producer);
+ this.producerTemplate = endpoint.getCamelContext().createProducerTemplate();
+ this.endpoint = endpoint;
+ ServiceHelper.startService(producerTemplate);
}
@Override
@@ -44,11 +49,11 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
@Override
public void onCompleted() {
try {
- ServiceHelper.stopService(producer);
+ ServiceHelper.stopService(producerTemplate);
} catch (Exception e) {
throw new RuntimeCamelRxException(e);
} finally {
- producer = null;
+ producerTemplate = null;
}
if (!s.isUnsubscribed()) {
s.onCompleted();
@@ -57,9 +62,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
@Override
public void onError(Throwable e) {
- Exchange exchange = producer.createExchange();
- exchange.setException(e);
- process(exchange);
+ // producer cannot handler the exception
+ // so we just pass the exchange to the subscriber
if (!s.isUnsubscribed()) {
s.onError(e);
}
@@ -68,7 +72,17 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
@Override
public void onNext(Message item) {
if (!s.isUnsubscribed()) {
- s.onNext(process(item));
+ Exchange exchange = process(item);
+ if (exchange.getException() != null) {
+ s.onError(exchange.getException());
+ } else {
+ if (exchange.hasOut()) {
+ s.onNext(exchange.getOut());
+ } else {
+ s.onNext(exchange.getIn());
+ }
+ }
+
}
}
};
@@ -76,20 +90,16 @@ public class CamelOperator implements Observable.Operator<Message, Message> {
private Exchange process(Exchange exchange) {
try {
- producer.process(exchange);
- if (exchange.hasOut()) {
- exchange.setIn(exchange.getOut());
- exchange.setOut(null);
- }
+ exchange = producerTemplate.send(endpoint, exchange);
} catch (Exception e) {
- throw new RuntimeCamelRxException(e);
+ exchange.setException(e);
}
return exchange;
}
- private Message process(Message message) {
- Exchange exchange = producer.createExchange();
+ private Exchange process(Message message) {
+ Exchange exchange = endpoint.createExchange();
exchange.setIn(message);
- return process(exchange).getIn();
+ return process(exchange);
}
}
[4/4] camel git commit: Added a comment for the
Main.getOrCreateCamelContext()
Posted by ni...@apache.org.
Added a comment for the Main.getOrCreateCamelContext()
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0d9928a2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0d9928a2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0d9928a2
Branch: refs/heads/master
Commit: 0d9928a23e3b2c89638b3ec15dd0e3bb3dc35aa6
Parents: 2d9b777
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 2 13:46:41 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 2 15:48:04 2015 +0800
----------------------------------------------------------------------
camel-core/src/main/java/org/apache/camel/main/Main.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0d9928a2/camel-core/src/main/java/org/apache/camel/main/Main.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/Main.java b/camel-core/src/main/java/org/apache/camel/main/Main.java
index 7a50785..1a7d74f 100644
--- a/camel-core/src/main/java/org/apache/camel/main/Main.java
+++ b/camel-core/src/main/java/org/apache/camel/main/Main.java
@@ -99,7 +99,11 @@ public class Main extends MainSupport {
}
/**
+ *
* Gets or creates the {@link org.apache.camel.CamelContext} this main class is using.
+ *
+ * It just create a new CamelContextMap per call, please don't use it to access the camel context that will be ran by main.
+ * If you want to setup the CamelContext please use MainListener to get the new created camel context.
*/
public CamelContext getOrCreateCamelContext() {
// force init