You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/21 22:51:10 UTC

[6/6] camel git commit: CAMEL-9899: camel-rx - Use a worker pool for tasks such as stopping consumers

CAMEL-9899: camel-rx - Use a worker pool for tasks such as stopping consumers


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8bff3a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8bff3a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8bff3a0

Branch: refs/heads/camel-2.17.x
Commit: e8bff3a0aef10e3d98cc100bfd065d6a81ca9acf
Parents: 4c97a2c
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 21 22:47:10 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 21 22:50:53 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/ReactiveCamel.java | 27 ++++++++++++++++++--
 .../camel/rx/support/EndpointSubscribeFunc.java |  8 ++++--
 .../camel/rx/support/EndpointSubscription.java  | 21 ++++++++++-----
 3 files changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index d1365bb..1cbe90e 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.rx;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -34,9 +36,30 @@ import rx.functions.Func1;
  */
 public class ReactiveCamel {
     private final CamelContext camelContext;
+    // a worker pool for running tasks such as stopping consumers which should not use the event loop
+    // thread from rx-java but use our own thread to process such tasks
+    private final ExecutorService workerPool;
 
+    /**
+     * Wrap the CamelContext as reactive.
+     * <p/>
+     * Uses a default value of 10 as maximum number of threads in the worker pool used for reactive background tasks.
+     *
+     * @param camelContext  the CamelContext
+     */
     public ReactiveCamel(CamelContext camelContext) {
+        this(camelContext, 10);
+    }
+
+    /**
+     * Wrap the CamelContext as reactive.
+     *
+     * @param camelContext  the CamelContext
+     * @param maxWorkerPoolSize  maximum number of threads in the worker pool used for reactive background tasks
+     */
+    public ReactiveCamel(CamelContext camelContext, int maxWorkerPoolSize) {
         this.camelContext = camelContext;
+        this.workerPool = camelContext.getExecutorServiceManager().newThreadPool(this, "ReactiveCamelWorker", 0, maxWorkerPoolSize);
     }
 
     /**
@@ -133,7 +156,7 @@ public class ReactiveCamel {
      */
     protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
                                                          final Func1<Exchange, T> converter) {
-        Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(endpoint, converter);
+        Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(workerPool, endpoint, converter);
         return new EndpointObservable<T>(endpoint, func);
     }
 
@@ -141,7 +164,7 @@ public class ReactiveCamel {
      * Return a newly created {@link Observable} without conversion
      */
     protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
-        return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(endpoint, new Func1<Exchange, Exchange>() {
+        return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(workerPool, endpoint, new Func1<Exchange, Exchange>() {
             @Override
             public Exchange call(Exchange exchange) {
                 return exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
index df79083..2f5f6dc 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.rx.support;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import rx.Observable;
@@ -24,16 +26,18 @@ import rx.functions.Func1;
 
 public class EndpointSubscribeFunc<T> implements Observable.OnSubscribe<T> {
 
+    private final ExecutorService workerPool;
     private final Endpoint endpoint;
     private final Func1<Exchange, T> converter;
 
-    public EndpointSubscribeFunc(Endpoint endpoint, Func1<Exchange, T> converter) {
+    public EndpointSubscribeFunc(ExecutorService workerPool, Endpoint endpoint, Func1<Exchange, T> converter) {
+        this.workerPool = workerPool;
         this.endpoint = endpoint;
         this.converter = converter;
     }
 
     @Override
     public void call(Subscriber<? super T> subscriber) {
-        subscriber.add(new EndpointSubscription<T>(endpoint, subscriber, converter));
+        subscriber.add(new EndpointSubscription<T>(workerPool, endpoint, subscriber, converter));
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 593e1d4..3c74d81 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.rx.support;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.Consumer;
@@ -37,13 +38,15 @@ public class EndpointSubscription<T> implements Subscription {
 
     private static final Logger LOG = LoggerFactory.getLogger(EndpointSubscription.class);
 
+    private final ExecutorService workerPool;
     private final Endpoint endpoint;
     private final Observer<? super T> observer;
     private Consumer consumer;
     private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
 
-    public EndpointSubscription(Endpoint endpoint, final Observer<? super T> observer,
+    public EndpointSubscription(ExecutorService workerPool, Endpoint endpoint, final Observer<? super T> observer,
                                 final Func1<Exchange, T> func) {
+        this.workerPool = workerPool;
         this.endpoint = endpoint;
         this.observer = observer;
 
@@ -69,11 +72,17 @@ public class EndpointSubscription<T> implements Subscription {
     public void unsubscribe() {
         if (unsubscribed.compareAndSet(false, true)) {
             if (consumer != null) {
-                try {
-                    ServiceHelper.stopServices(consumer);
-                } catch (Exception e) {
-                    LOG.warn("Error stopping consumer: " + consumer + " due " + e.getMessage() + ". This exception is ignored.", e);
-                }
+                // must stop the consumer from the worker pool as we should not stop ourself from a thread from ourself
+                workerPool.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            ServiceHelper.stopServices(consumer);
+                        } catch (Exception e) {
+                            LOG.warn("Error stopping consumer: " + consumer + " due " + e.getMessage() + ". This exception is ignored.", e);
+                        }
+                    }
+                });
             }
         }
     }