You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/21 22:51:10 UTC
[6/6] camel git commit: CAMEL-9899: camel-rx - Use a worker pool for
tasks such as stopping consumers
CAMEL-9899: camel-rx - Use a worker pool for tasks such as stopping consumers
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8bff3a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8bff3a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8bff3a0
Branch: refs/heads/camel-2.17.x
Commit: e8bff3a0aef10e3d98cc100bfd065d6a81ca9acf
Parents: 4c97a2c
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 21 22:47:10 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 21 22:50:53 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/rx/ReactiveCamel.java | 27 ++++++++++++++++++--
.../camel/rx/support/EndpointSubscribeFunc.java | 8 ++++--
.../camel/rx/support/EndpointSubscription.java | 21 ++++++++++-----
3 files changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index d1365bb..1cbe90e 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.rx;
+import java.util.concurrent.ExecutorService;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -34,9 +36,30 @@ import rx.functions.Func1;
*/
public class ReactiveCamel {
private final CamelContext camelContext;
+ // a worker pool for running tasks such as stopping consumers which should not use the event loop
+ // thread from rx-java but use our own thread to process such tasks
+ private final ExecutorService workerPool;
+ /**
+ * Wrap the CamelContext as reactive.
+ * <p/>
+ * Uses a default value of 10 as maximum number of threads in the worker pool used for reactive background tasks.
+ *
+ * @param camelContext the CamelContext
+ */
public ReactiveCamel(CamelContext camelContext) {
+ this(camelContext, 10);
+ }
+
+ /**
+ * Wrap the CamelContext as reactive.
+ *
+ * @param camelContext the CamelContext
+ * @param maxWorkerPoolSize maximum number of threads in the worker pool used for reactive background tasks
+ */
+ public ReactiveCamel(CamelContext camelContext, int maxWorkerPoolSize) {
this.camelContext = camelContext;
+ this.workerPool = camelContext.getExecutorServiceManager().newThreadPool(this, "ReactiveCamelWorker", 0, maxWorkerPoolSize);
}
/**
@@ -133,7 +156,7 @@ public class ReactiveCamel {
*/
protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
final Func1<Exchange, T> converter) {
- Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(endpoint, converter);
+ Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(workerPool, endpoint, converter);
return new EndpointObservable<T>(endpoint, func);
}
@@ -141,7 +164,7 @@ public class ReactiveCamel {
* Return a newly created {@link Observable} without conversion
*/
protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
- return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(endpoint, new Func1<Exchange, Exchange>() {
+ return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(workerPool, endpoint, new Func1<Exchange, Exchange>() {
@Override
public Exchange call(Exchange exchange) {
return exchange;
http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
index df79083..2f5f6dc 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.rx.support;
+import java.util.concurrent.ExecutorService;
+
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import rx.Observable;
@@ -24,16 +26,18 @@ import rx.functions.Func1;
public class EndpointSubscribeFunc<T> implements Observable.OnSubscribe<T> {
+ private final ExecutorService workerPool;
private final Endpoint endpoint;
private final Func1<Exchange, T> converter;
- public EndpointSubscribeFunc(Endpoint endpoint, Func1<Exchange, T> converter) {
+ public EndpointSubscribeFunc(ExecutorService workerPool, Endpoint endpoint, Func1<Exchange, T> converter) {
+ this.workerPool = workerPool;
this.endpoint = endpoint;
this.converter = converter;
}
@Override
public void call(Subscriber<? super T> subscriber) {
- subscriber.add(new EndpointSubscription<T>(endpoint, subscriber, converter));
+ subscriber.add(new EndpointSubscription<T>(workerPool, endpoint, subscriber, converter));
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 593e1d4..3c74d81 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.rx.support;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Consumer;
@@ -37,13 +38,15 @@ public class EndpointSubscription<T> implements Subscription {
private static final Logger LOG = LoggerFactory.getLogger(EndpointSubscription.class);
+ private final ExecutorService workerPool;
private final Endpoint endpoint;
private final Observer<? super T> observer;
private Consumer consumer;
private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
- public EndpointSubscription(Endpoint endpoint, final Observer<? super T> observer,
+ public EndpointSubscription(ExecutorService workerPool, Endpoint endpoint, final Observer<? super T> observer,
final Func1<Exchange, T> func) {
+ this.workerPool = workerPool;
this.endpoint = endpoint;
this.observer = observer;
@@ -69,11 +72,17 @@ public class EndpointSubscription<T> implements Subscription {
public void unsubscribe() {
if (unsubscribed.compareAndSet(false, true)) {
if (consumer != null) {
- try {
- ServiceHelper.stopServices(consumer);
- } catch (Exception e) {
- LOG.warn("Error stopping consumer: " + consumer + " due " + e.getMessage() + ". This exception is ignored.", e);
- }
+ // must stop the consumer from the worker pool as we should not stop ourself from a thread from ourself
+ workerPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ServiceHelper.stopServices(consumer);
+ } catch (Exception e) {
+ LOG.warn("Error stopping consumer: " + consumer + " due " + e.getMessage() + ". This exception is ignored.", e);
+ }
+ }
+ });
}
}
}