You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/06/12 13:50:23 UTC

[camel] branch camel-13636 created (now 518da60)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git.


      at 518da60  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

This branch includes the following new commits:

     new e81a4ca9 CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new f0d5077  camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 428c2f5  camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new de2b739  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new fefaf8e  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new ac1d295  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 2d5982e  BaseExecutorServiceManager should be abstract
     new 518da60  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/08: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e81a4ca9eff14321cd2057dcda217474b7fdcf8a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 12:44:59 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../java/org/apache/camel/support/ReactiveHelper.java  | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
index 7e05636..06db5e6 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -22,6 +22,9 @@ import org.apache.camel.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * A basic reactive engine that uses a worker pool to process tasks.
+ */
 public final class ReactiveHelper {
 
     private static final ThreadLocal<Worker> WORKERS = ThreadLocal.withInitial(Worker::new);
@@ -51,6 +54,10 @@ public final class ReactiveHelper {
         WORKERS.get().schedule(describe(runnable, description), true, false, false);
     }
 
+    /**
+     * @deprecated not in use
+     */
+    @Deprecated
     public static void scheduleLast(Runnable runnable, String description) {
         WORKERS.get().schedule(describe(runnable, description), false, false, false);
     }
@@ -91,9 +98,9 @@ public final class ReactiveHelper {
 
     private static class Worker {
 
-        LinkedList<Runnable> queue = new LinkedList<>();
-        LinkedList<LinkedList<Runnable>> back;
-        boolean running;
+        private volatile LinkedList<Runnable> queue = new LinkedList<>();
+        private volatile LinkedList<LinkedList<Runnable>> back;
+        private volatile boolean running;
 
         public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
             if (main) {
@@ -129,7 +136,7 @@ public final class ReactiveHelper {
 //                            thread.setName(name + " - " + polled.toString());
                             polled.run();
                         } catch (Throwable t) {
-                            t.printStackTrace();
+                            LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
                         }
                     }
                 } finally {
@@ -152,7 +159,8 @@ public final class ReactiveHelper {
                 thread.setName(name + " - " + polled.toString());
                 polled.run();
             } catch (Throwable t) {
-                t.printStackTrace();
+                // should not happen
+                LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
             } finally {
                 thread.setName(name);
             }


[camel] 02/08: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f0d50772af3ab7a4b92a70998d3b682b30c4d425
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 12:54:37 2019 +0200

    camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../main/java/org/apache/camel/CamelContext.java   |  8 +++
 .../org/apache/camel/spi/ReactiveExecutor.java     | 41 +++++++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    | 19 +++++++
 .../camel/impl/engine/DefaultReactiveExecutor.java | 61 ++++++++++++++++++++++
 .../org/apache/camel/impl/DefaultCamelContext.java |  6 +++
 5 files changed, 135 insertions(+)

diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index 48f928a..b260e98 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -37,6 +37,7 @@ import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.PropertiesComponent;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RestRegistry;
@@ -1217,4 +1218,11 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
      */
     void setHeadersMapFactory(HeadersMapFactory factory);
 
+    ReactiveExecutor getReactiveExecutor();
+
+    /**
+     * Sets a custom {@link ReactiveExecutor} to be used.
+     */
+    void setReactiveExecutor(ReactiveExecutor reactiveExecutor);
+
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
new file mode 100644
index 0000000..8987bd3
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * SPI to plugin different reactive engines in the Camel routing engine.
+ */
+public interface ReactiveExecutor {
+
+    // TODO: Add javadoc
+    // TODO: Better name
+
+    void scheduleMain(Runnable runnable);
+
+    void scheduleSync(Runnable runnable);
+
+    void scheduleMain(Runnable runnable, String description);
+
+    void schedule(Runnable runnable);
+
+    void schedule(Runnable runnable, String description);
+
+    void scheduleSync(Runnable runnable, String description);
+
+    boolean executeFromQueue();
+
+}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index aacefac..521a8d6 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -119,6 +119,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RestRegistry;
@@ -216,6 +217,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
 
     private final Object lock = new Object();
     private volatile CamelContextNameStrategy nameStrategy;
+    private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
     private volatile Registry registry;
     private volatile TypeConverter typeConverter;
@@ -3791,6 +3793,21 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         this.headersMapFactory = doAddService(headersMapFactory);
     }
 
+    public ReactiveExecutor getReactiveExecutor() {
+        if (reactiveExecutor == null) {
+            synchronized (lock) {
+                if (reactiveExecutor == null) {
+                    setReactiveExecutor(createReactiveExecutor());
+                }
+            }
+        }
+        return reactiveExecutor;
+    }
+
+    public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) {
+        this.reactiveExecutor = reactiveExecutor;
+    }
+
     @Override
     public DeferServiceFactory getDeferServiceFactory() {
         return deferServiceFactory;
@@ -3869,6 +3886,8 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         }
     }
 
+    protected abstract ReactiveExecutor createReactiveExecutor();
+
     protected abstract StreamCachingStrategy createStreamCachingStrategy();
 
     protected abstract TypeConverter createTypeConverter();
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
new file mode 100644
index 0000000..6a9473b
--- /dev/null
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.spi.ReactiveExecutor;
+import org.apache.camel.support.ReactiveHelper;
+
+/**
+ * Default {@link ReactiveExecutor}.
+ */
+public class DefaultReactiveExecutor implements ReactiveExecutor {
+
+    @Override
+    public void scheduleMain(Runnable runnable) {
+        ReactiveHelper.scheduleMain(runnable);
+    }
+
+    @Override
+    public void scheduleSync(Runnable runnable) {
+        ReactiveHelper.scheduleSync(runnable);
+    }
+
+    @Override
+    public void scheduleMain(Runnable runnable, String description) {
+        ReactiveHelper.scheduleMain(runnable, description);
+    }
+
+    @Override
+    public void schedule(Runnable runnable) {
+        ReactiveHelper.schedule(runnable);
+    }
+
+    @Override
+    public void schedule(Runnable runnable, String description) {
+        ReactiveHelper.schedule(runnable, description);
+    }
+
+    @Override
+    public void scheduleSync(Runnable runnable, String description) {
+        ReactiveHelper.scheduleSync(runnable, description);
+    }
+
+    @Override
+    public boolean executeFromQueue() {
+        return ReactiveHelper.executeFromQueue();
+    }
+}
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 6b7ac7e..1cce2b8 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -45,6 +45,7 @@ import org.apache.camel.impl.engine.DefaultMessageHistoryFactory;
 import org.apache.camel.impl.engine.DefaultNodeIdFactory;
 import org.apache.camel.impl.engine.DefaultPackageScanClassResolver;
 import org.apache.camel.impl.engine.DefaultProcessorFactory;
+import org.apache.camel.impl.engine.DefaultReactiveExecutor;
 import org.apache.camel.impl.engine.DefaultRouteController;
 import org.apache.camel.impl.engine.DefaultShutdownStrategy;
 import org.apache.camel.impl.engine.DefaultStreamCachingStrategy;
@@ -81,6 +82,7 @@ import org.apache.camel.spi.ModelJAXBContextFactory;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RestRegistryFactory;
 import org.apache.camel.spi.RouteController;
@@ -304,4 +306,8 @@ public class DefaultCamelContext extends AbstractModelCamelContext {
     protected StreamCachingStrategy createStreamCachingStrategy() {
         return new DefaultStreamCachingStrategy();
     }
+
+    protected ReactiveExecutor createReactiveExecutor() {
+        return new DefaultReactiveExecutor();
+    }
 }


[camel] 06/08: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ac1d2955bdecabb9ed29f77eda4cd9deb2e06e54
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 14:26:37 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     | 32 +++++++++++---
 .../camel/impl/engine/AbstractCamelContext.java    |  7 +++-
 .../camel/impl/engine/DefaultReactiveExecutor.java | 49 ++++++++++++----------
 .../camel/processor/CamelInternalProcessor.java    |  8 ++--
 4 files changed, 62 insertions(+), 34 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 4a21127..37744fb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -26,20 +26,40 @@ public interface ReactiveExecutor {
     // TODO: Add javadoc
     // TODO: Better name
 
-    void scheduleMain(Runnable runnable);
+    default void schedule(Runnable runnable) {
+        schedule(runnable, null);
+    }
 
-    void scheduleSync(Runnable runnable);
+    void schedule(Runnable runnable, String description);
 
-    void scheduleMain(Runnable runnable, String description);
+    default void scheduleMain(Runnable runnable) {
+        scheduleMain(runnable, null);
+    }
 
-    void schedule(Runnable runnable);
+    void scheduleMain(Runnable runnable, String description);
 
-    void schedule(Runnable runnable, String description);
+    default void scheduleSync(Runnable runnable) {
+        scheduleSync(runnable, null);
+    }
 
     void scheduleSync(Runnable runnable, String description);
 
+    // TODO: Can we make this so we dont need an method on this interface as its only used once
     boolean executeFromQueue();
 
-    void callback(AsyncCallback callback);
+    default void callback(AsyncCallback callback) {
+        schedule(new Runnable() {
+
+            @Override
+            public void run() {
+                callback.done(false);
+            }
+
+            @Override
+            public String toString() {
+                return "Callback[" + callback + "]";
+            }
+        });
+    }
 
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 89b1806..20eadae 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2587,8 +2587,9 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
             shutdownServices(notifier);
         }
 
-        // shutdown executor service and management as the last one
+        // shutdown executor service, reactive executor and management as the last one
         shutdownServices(executorServiceManager);
+        shutdownServices(reactiveExecutor);
         shutdownServices(managementStrategy);
         shutdownServices(managementMBeanAssembler);
         shutdownServices(lifecycleStrategies);
@@ -3806,7 +3807,9 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
     }
 
     public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) {
-        this.reactiveExecutor = reactiveExecutor;
+        // special for executorServiceManager as want to stop it manually so
+        // false in stopOnShutdown
+        this.reactiveExecutor = doAddService(reactiveExecutor, false);
     }
 
     @Override
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 3ce7f0a..e094999 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -19,16 +19,17 @@ package org.apache.camel.impl.engine;
 import java.util.LinkedList;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ReactiveExecutor;
+import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ReactiveExecutor}.
  */
-public class DefaultReactiveExecutor implements ReactiveExecutor {
+public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService {
 
-    // TODO: StaticServiceSupport so we can init/start/stop
     // TODO: Add mbean info so we can get details
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class);
@@ -36,33 +37,27 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
     private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new);
 
     @Override
-    public void scheduleMain(Runnable runnable) {
-        workers.get().schedule(runnable, true, true, false);
-    }
-
-    @Override
-    public void scheduleSync(Runnable runnable) {
-        workers.get().schedule(runnable, true, true, true);
-    }
-
-    @Override
     public void scheduleMain(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), true, true, false);
-    }
-
-    @Override
-    public void schedule(Runnable runnable) {
-        workers.get().schedule(runnable, true, false, false);;
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, true, true, false);
     }
 
     @Override
     public void schedule(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), true, false, false);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, true, false, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), false, true, true);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, false, true, true);
     }
 
     @Override
@@ -97,13 +92,23 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
         };
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
     private static class Worker {
 
         private volatile LinkedList<Runnable> queue = new LinkedList<>();
         private volatile LinkedList<LinkedList<Runnable>> back;
         private volatile boolean running;
 
-        public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
+        void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
             if (main) {
                 if (!queue.isEmpty()) {
                     if (back == null) {
@@ -149,7 +154,7 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
             }
         }
 
-        public boolean executeFromQueue() {
+        boolean executeFromQueue() {
             final Runnable polled = queue != null ? queue.poll() : null;
             if (polled == null) {
                 return false;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index f039f27..3651856 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -120,7 +120,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     }
 
     @Override
-    public boolean process(Exchange exchange, AsyncCallback ocallback) {
+    public boolean process(Exchange exchange, AsyncCallback originalCallback) {
         // ----------------------------------------------------------
         // CAMEL END USER - READ ME FOR DEBUGGING TIPS
         // ----------------------------------------------------------
@@ -137,7 +137,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
         if (processor == null || !continueProcessing(exchange)) {
             // no processor or we should not continue then we are done
-            ocallback.done(true);
+            originalCallback.done(true);
             return true;
         }
 
@@ -151,7 +151,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 states[i] = state;
             } catch (Throwable e) {
                 exchange.setException(e);
-                ocallback.done(true);
+                originalCallback.done(true);
                 return true;
             }
         }
@@ -174,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                exchange.getContext().getReactiveExecutor().callback(ocallback);
+                exchange.getContext().getReactiveExecutor().callback(originalCallback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------


[camel] 04/08: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit de2b739c5de471e6ea8af72b94cff12b03f8e013
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 13:57:26 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/impl/engine/AbstractCamelContext.java    |  5 +-
 .../camel/impl/engine/DefaultReactiveExecutor.java |  3 +
 .../impl/engine/ReactiveExecutorResolver.java      | 73 ++++++++++++++++++++++
 .../org/apache/camel/impl/DefaultCamelContext.java |  3 +-
 4 files changed, 81 insertions(+), 3 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 521a8d6..89b1806 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -3151,6 +3151,8 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
     }
 
     protected void doStartEagerServices() {
+        getFactoryFinderResolver();
+        getDefaultFactoryFinder();
         getComponentResolver();
         getDataFormatResolver();
         getManagementStrategy();
@@ -3159,8 +3161,6 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getNodeIdFactory();
         getProcessorFactory();
         getMessageHistoryFactory();
-        getFactoryFinderResolver();
-        getDefaultFactoryFinder();
         getStreamCachingStrategy();
         getModelJAXBContextFactory();
         getUuidGenerator();
@@ -3169,6 +3169,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getBeanProxyFactory();
         getBeanProcessorFactory();
         getBeanPostProcessor();
+        getReactiveExecutor();
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index d700e3c..0448037 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -25,6 +25,9 @@ import org.apache.camel.support.ReactiveHelper;
  */
 public class DefaultReactiveExecutor implements ReactiveExecutor {
 
+    // TODO: ReactiveHelper code should be moved here and not static
+    // ppl should use the SPI interface
+
     @Override
     public void scheduleMain(Runnable runnable) {
         ReactiveHelper.scheduleMain(runnable);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
new file mode 100644
index 0000000..3a78412
--- /dev/null
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.io.IOException;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.spi.FactoryFinder;
+import org.apache.camel.spi.ReactiveExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory resolver to create the {@link org.apache.camel.spi.ReactiveExecutor} to be used.
+ */
+public class ReactiveExecutorResolver {
+
+    public static final String RESOURCE_PATH = "META-INF/services/org/apache/camel/";
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReactiveExecutorResolver.class);
+
+    private FactoryFinder factoryFinder;
+
+    public ReactiveExecutor resolve(CamelContext context) {
+        // use factory finder to find a custom implementations
+        Class<?> type = null;
+        try {
+            type = findFactory("reactive-executor-factory", context);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        if (type != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Found ReactiveExecutor: {} via: {}{}", type.getName(), factoryFinder.getResourcePath(), "reactive-executor-factory");
+            }
+            if (ReactiveExecutor.class.isAssignableFrom(type)) {
+                ReactiveExecutor answer = (ReactiveExecutor) context.getInjector().newInstance(type, false);
+                LOG.debug("Detected and using ReactiveExecutor: {}", answer);
+                return answer;
+            } else {
+                throw new IllegalArgumentException("Type is not a ReactiveExecutor implementation. Found: " + type.getName());
+            }
+        }
+
+        // fallback to default
+        LOG.debug("Creating default ReactiveExecutor");
+        return new DefaultReactiveExecutor();
+    }
+
+    private Class<?> findFactory(String name, CamelContext context) throws ClassNotFoundException, IOException {
+        if (factoryFinder == null) {
+            factoryFinder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(RESOURCE_PATH);
+        }
+        return factoryFinder.findClass(name);
+    }
+
+}
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 1cce2b8..ee0a939 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -53,6 +53,7 @@ import org.apache.camel.impl.engine.DefaultUnitOfWorkFactory;
 import org.apache.camel.impl.engine.DefaultUuidGenerator;
 import org.apache.camel.impl.engine.EndpointKey;
 import org.apache.camel.impl.engine.HeadersMapFactoryResolver;
+import org.apache.camel.impl.engine.ReactiveExecutorResolver;
 import org.apache.camel.impl.engine.RestRegistryFactoryResolver;
 import org.apache.camel.impl.engine.ServicePool;
 import org.apache.camel.impl.engine.WebSpherePackageScanClassResolver;
@@ -308,6 +309,6 @@ public class DefaultCamelContext extends AbstractModelCamelContext {
     }
 
     protected ReactiveExecutor createReactiveExecutor() {
-        return new DefaultReactiveExecutor();
+        return new ReactiveExecutorResolver().resolve(this);
     }
 }


[camel] 05/08: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fefaf8e616daf3c98ee95181e6553f9740ce23ba
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 14:15:24 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/impl/engine/DefaultReactiveExecutor.java | 133 ++++++++++++++++++---
 .../org/apache/camel/support/ReactiveHelper.java   |   1 +
 2 files changed, 119 insertions(+), 15 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 0448037..3ce7f0a 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,55 +16,158 @@
  */
 package org.apache.camel.impl.engine;
 
+import java.util.LinkedList;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.spi.ReactiveExecutor;
-import org.apache.camel.support.ReactiveHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ReactiveExecutor}.
  */
 public class DefaultReactiveExecutor implements ReactiveExecutor {
 
-    // TODO: ReactiveHelper code should be moved here and not static
-    // ppl should use the SPI interface
+    // TODO: StaticServiceSupport so we can init/start/stop
+    // TODO: Add mbean info so we can get details
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class);
+
+    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new);
 
     @Override
     public void scheduleMain(Runnable runnable) {
-        ReactiveHelper.scheduleMain(runnable);
+        workers.get().schedule(runnable, true, true, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable) {
-        ReactiveHelper.scheduleSync(runnable);
+        workers.get().schedule(runnable, true, true, true);
     }
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
-        ReactiveHelper.scheduleMain(runnable, description);
+        workers.get().schedule(describe(runnable, description), true, true, false);
     }
 
     @Override
     public void schedule(Runnable runnable) {
-        ReactiveHelper.schedule(runnable);
+        workers.get().schedule(runnable, true, false, false);;
     }
 
     @Override
     public void schedule(Runnable runnable, String description) {
-        ReactiveHelper.schedule(runnable, description);
+        workers.get().schedule(describe(runnable, description), true, false, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
-        ReactiveHelper.scheduleSync(runnable, description);
+        workers.get().schedule(describe(runnable, description), false, true, true);
     }
 
     @Override
     public boolean executeFromQueue() {
-        return ReactiveHelper.executeFromQueue();
+        return workers.get().executeFromQueue();
     }
 
     @Override
     public void callback(AsyncCallback callback) {
-        ReactiveHelper.callback(callback);
+        schedule(new Runnable() {
+            @Override
+            public void run() {
+                callback.done(false);
+            }
+            @Override
+            public String toString() {
+                return "Callback[" + callback + "]";
+            }
+        });
     }
+
+    private static Runnable describe(Runnable runnable, String description) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                runnable.run();
+            }
+            @Override
+            public String toString() {
+                return description;
+            }
+        };
+    }
+
+    private static class Worker {
+
+        private volatile LinkedList<Runnable> queue = new LinkedList<>();
+        private volatile LinkedList<LinkedList<Runnable>> back;
+        private volatile boolean running;
+
+        public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
+            if (main) {
+                if (!queue.isEmpty()) {
+                    if (back == null) {
+                        back = new LinkedList<>();
+                    }
+                    back.push(queue);
+                    queue = new LinkedList<>();
+                }
+            }
+            if (first) {
+                queue.addFirst(runnable);
+            } else {
+                queue.addLast(runnable);
+            }
+            if (!running || sync) {
+                running = true;
+//                Thread thread = Thread.currentThread();
+//                String name = thread.getName();
+                try {
+                    for (;;) {
+                        final Runnable polled = queue.poll();
+                        if (polled == null) {
+                            if (back != null && !back.isEmpty()) {
+                                queue = back.poll();
+                                continue;
+                            } else {
+                                break;
+                            }
+                        }
+                        try {
+//                            thread.setName(name + " - " + polled.toString());
+                            polled.run();
+                        } catch (Throwable t) {
+                            LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
+                        }
+                    }
+                } finally {
+//                    thread.setName(name);
+                    running = false;
+                }
+            } else {
+                LOG.debug("Queuing reactive work: {}", runnable);
+            }
+        }
+
+        public boolean executeFromQueue() {
+            final Runnable polled = queue != null ? queue.poll() : null;
+            if (polled == null) {
+                return false;
+            }
+            Thread thread = Thread.currentThread();
+            String name = thread.getName();
+            try {
+                thread.setName(name + " - " + polled.toString());
+                polled.run();
+            } catch (Throwable t) {
+                // should not happen
+                LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
+            } finally {
+                thread.setName(name);
+            }
+            return true;
+        }
+
+    }
+
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
index 06db5e6..9541371 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A basic reactive engine that uses a worker pool to process tasks.
  */
+@Deprecated
 public final class ReactiveHelper {
 
     private static final ThreadLocal<Worker> WORKERS = ThreadLocal.withInitial(Worker::new);


[camel] 03/08: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 428c2f558a70c85873226095af4ff99770b23237
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 13:01:12 2019 +0200

    camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     |  4 ++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  3 +--
 .../camel/impl/engine/DefaultReactiveExecutor.java |  6 +++++
 .../camel/processor/CamelInternalProcessor.java    |  5 ++--
 .../org/apache/camel/processor/LoopProcessor.java  |  7 +++---
 .../apache/camel/processor/MulticastProcessor.java | 13 +++++------
 .../java/org/apache/camel/processor/Pipeline.java  |  9 ++++----
 .../processor/SharedCamelInternalProcessor.java    |  5 ++--
 .../org/apache/camel/processor/TryProcessor.java   |  5 ++--
 .../processor/aggregate/AggregateProcessor.java    |  3 +--
 .../errorhandler/RedeliveryErrorHandler.java       | 27 +++++++++++-----------
 .../loadbalancer/FailOverLoadBalancer.java         |  5 ++--
 .../processor/loadbalancer/TopicLoadBalancer.java  |  5 ++--
 13 files changed, 48 insertions(+), 49 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 8987bd3..4a21127 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.AsyncCallback;
+
 /**
  * SPI to plugin different reactive engines in the Camel routing engine.
  */
@@ -38,4 +40,6 @@ public interface ReactiveExecutor {
 
     boolean executeFromQueue();
 
+    void callback(AsyncCallback callback);
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 3a2b41c..8087942 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -34,7 +34,6 @@ import org.apache.camel.StaticService;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DefaultExchangeFormatter;
 import org.apache.camel.support.service.ServiceSupport;
 
@@ -88,7 +87,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
             if (latch.getCount() <= 0) {
                 return;
             }
-        } while (ReactiveHelper.executeFromQueue());
+        } while (exchange.getContext().getReactiveExecutor().executeFromQueue());
         log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 6a9473b..d700e3c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.support.ReactiveHelper;
 
@@ -58,4 +59,9 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
     public boolean executeFromQueue() {
         return ReactiveHelper.executeFromQueue();
     }
+
+    @Override
+    public void callback(AsyncCallback callback) {
+        ReactiveHelper.callback(callback);
+    }
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 27faa0b..f039f27 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -48,7 +48,6 @@ import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.apache.camel.util.StopWatch;
@@ -175,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                ReactiveHelper.callback(ocallback);
+                exchange.getContext().getReactiveExecutor().callback(ocallback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
@@ -225,7 +224,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            ReactiveHelper.schedule(() -> {
+            exchange.getContext().getReactiveExecutor().schedule(() -> {
                 // execute any after processor work (in current thread, not in the callback)
                 if (uow != null) {
                     uow.afterProcess(processor, exchange, callback, false);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 75329bc..1628c85 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -25,7 +25,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -54,9 +53,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
             LoopState state = new LoopState(exchange, callback);
 
             if (exchange.isTransacted()) {
-                ReactiveHelper.scheduleSync(state);
+                exchange.getContext().getReactiveExecutor().scheduleSync(state);
             } else {
-                ReactiveHelper.scheduleMain(state);
+                exchange.getContext().getReactiveExecutor().scheduleMain(state);
             }
             return false;
 
@@ -113,7 +112,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                     processor.process(current, doneSync -> {
                         // increment counter after done
                         index++;
-                        ReactiveHelper.schedule(this);
+                        exchange.getContext().getReactiveExecutor().schedule(this);
                     });
                 } else {
                     // we are done so prepare the result
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 594c936..4d0806f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -56,7 +56,6 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
@@ -220,12 +219,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
         MulticastState state = new MulticastState(exchange, pairs, callback);
         if (isParallelProcessing()) {
-            executorService.submit(() -> ReactiveHelper.schedule(state));
+            executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state));
         } else {
             if (exchange.isTransacted()) {
-                ReactiveHelper.scheduleSync(state);
+                exchange.getContext().getReactiveExecutor().scheduleSync(state);
             } else {
-                ReactiveHelper.scheduleMain(state);
+                exchange.getContext().getReactiveExecutor().scheduleMain(state);
             }
         }
 
@@ -237,9 +236,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
     protected void schedule(Runnable runnable) {
         if (isParallelProcessing()) {
-            executorService.submit(() -> ReactiveHelper.schedule(runnable));
+            executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable));
         } else {
-            ReactiveHelper.schedule(runnable, "Multicast next step");
+            camelContext.getReactiveExecutor().schedule(runnable, "Multicast next step");
         }
     }
 
@@ -524,7 +523,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
             original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
         }
 
-        ReactiveHelper.callback(callback);
+        camelContext.getReactiveExecutor().callback(callback);
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 077c8de..9ffe248 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -33,7 +33,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -82,10 +81,10 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+            camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
                     "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
         } else {
-            ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+            camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
                     "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
         }
         return false;
@@ -105,7 +104,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             AsyncProcessor processor = processors.next();
 
             processor.process(exchange, doneSync ->
-                    ReactiveHelper.schedule(() -> doProcess(exchange, callback, processors, false),
+                    camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false),
                             "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
@@ -115,7 +114,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
             log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
 
-            ReactiveHelper.callback(callback);
+            camelContext.getReactiveExecutor().callback(callback);
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 0cf2654..1a5e92d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -36,7 +36,6 @@ import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -192,7 +191,7 @@ public class SharedCamelInternalProcessor {
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            ReactiveHelper.schedule(() -> {
+            exchange.getContext().getReactiveExecutor().schedule(() -> {
                 // execute any after processor work (in current thread, not in the callback)
                 if (uow != null) {
                     uow.afterProcess(processor, exchange, callback, sync);
@@ -255,7 +254,7 @@ public class SharedCamelInternalProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                ReactiveHelper.callback(callback);
+                exchange.getContext().getReactiveExecutor().callback(callback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index 264ba2b..9b1d50d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -30,7 +30,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 
 /**
@@ -61,7 +60,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
 
-        ReactiveHelper.schedule(new TryState(exchange, callback));
+        exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback));
         return false;
     }
 
@@ -90,7 +89,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
                 Processor processor = processors.next();
                 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
                 log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-                async.process(exchange, doneSync -> ReactiveHelper.schedule(this));
+                async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this));
             } else {
                 ExchangeHelper.prepareOutToIn(exchange);
                 exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 2ccde2e..f4508dd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -64,7 +64,6 @@ import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.LRUCacheFactory;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.NoLock;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
@@ -770,7 +769,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         // send this exchange
         // the call to schedule last if needed to ensure in-order processing of the aggregates
-        executorService.submit(() -> ReactiveHelper.scheduleSync(() -> processor.process(exchange, done -> {
+        executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> {
             // log exception if there was a problem
             if (exchange.getException() != null) {
                 // if there was an exception then let the exception handler handle it
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 782d8a7..2628c67 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -48,7 +48,6 @@ import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DefaultExchangeFormatter;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -153,9 +152,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         RedeliveryState state = new RedeliveryState(exchange, callback);
         // Run it
         if (exchange.isTransacted()) {
-            ReactiveHelper.scheduleSync(state);
+            camelContext.getReactiveExecutor().scheduleSync(state);
         } else {
-            ReactiveHelper.scheduleMain(state);
+            camelContext.getReactiveExecutor().scheduleMain(state);
         }
         return false;
     }
@@ -442,7 +441,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         if (log.isTraceEnabled()) {
                             log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
                         }
-                        executorService.schedule(() -> ReactiveHelper.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
+                        executorService.schedule(() -> camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
 
                     } else {
                         // async delayed redelivery was disabled or we are transacted so we must be synchronous
@@ -458,9 +457,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                                 // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
                                 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
                                 // jump to start of loop which then detects that we are failed and exhausted
-                                ReactiveHelper.schedule(this);
+                                camelContext.getReactiveExecutor().schedule(this);
                             } else {
-                                ReactiveHelper.schedule(this::redeliver);
+                                camelContext.getReactiveExecutor().schedule(this::redeliver);
                             }
                         } catch (InterruptedException e) {
                             redeliverySleepCounter.decrementAndGet();
@@ -469,12 +468,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                             // mark the exchange to stop continue routing when interrupted
                             // as we do not want to continue routing (for example a task has been cancelled)
                             exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
-                            ReactiveHelper.callback(callback);
+                            camelContext.getReactiveExecutor().callback(callback);
                         }
                     }
                 } else {
                     // execute the task immediately
-                    ReactiveHelper.schedule(this::redeliver);
+                    camelContext.getReactiveExecutor().schedule(this::redeliver);
                 }
             } else {
                 // Simple delivery
@@ -482,10 +481,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
                     if (isDone(exchange)) {
-                        ReactiveHelper.callback(callback);
+                        camelContext.getReactiveExecutor().callback(callback);
                     } else {
                         // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                        ReactiveHelper.schedule(this);
+                        camelContext.getReactiveExecutor().schedule(this);
                     }
                 });
             }
@@ -563,11 +562,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    ReactiveHelper.callback(callback);
+                    camelContext.getReactiveExecutor().callback(callback);
                     return;
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                    ReactiveHelper.schedule(this);
+                    camelContext.getReactiveExecutor().schedule(this);
                 }
             });
         }
@@ -845,7 +844,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
                     } finally {
                         // if the fault was handled asynchronously, this should be reflected in the callback as well
-                        ReactiveHelper.callback(callback);
+                        camelContext.getReactiveExecutor().callback(callback);
                     }
                 });
             } else {
@@ -864,7 +863,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
                 } finally {
                     // callback we are done
-                    ReactiveHelper.callback(callback);
+                    camelContext.getReactiveExecutor().callback(callback);
                 }
             }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 059c795..89a6f91 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -27,7 +27,6 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Traceable;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -159,7 +158,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -246,7 +245,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
 
             // process the exchange
             log.debug("Processing failover at attempt {} for {}", attempts, copy);
-            processor.process(copy, doneSync -> ReactiveHelper.schedule(this::run));
+            processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run));
         }
 
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
index 09ba098..ebd0f87 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.support.ReactiveHelper;
 
 /**
  * A {@link LoadBalancer} implementations which sends to all destinations
@@ -33,7 +32,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -64,7 +63,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
                 exchange.setException(current.getException());
                 callback.done(false);
             } else {
-                ReactiveHelper.schedule(this::run);
+                exchange.getContext().getReactiveExecutor().schedule(this::run);
             }
         }
     }


[camel] 07/08: BaseExecutorServiceManager should be abstract

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2d5982ed4e85cb243b29e2a6a835b1b459ebfaa6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 14:35:58 2019 +0200

    BaseExecutorServiceManager should be abstract
---
 .../org/apache/camel/impl/engine/BaseExecutorServiceManager.java     | 5 ++---
 .../java/org/apache/camel/impl/DefaultExecutorServiceManager.java    | 3 +++
 .../org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java    | 3 ++-
 3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
index 67c4e56..5dfd1e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
@@ -52,10 +52,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Default {@link org.apache.camel.spi.ExecutorServiceManager}.
- *
+ * Base {@link org.apache.camel.spi.ExecutorServiceManager} which can be used for implementations
  */
-public class BaseExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
+public abstract class BaseExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
     private static final Logger LOG = LoggerFactory.getLogger(BaseExecutorServiceManager.class);
 
     private final CamelContext camelContext;
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
index b9eb1a1..a75bcbc 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
@@ -26,6 +26,9 @@ import org.apache.camel.model.OptionalIdentifiedDefinition;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
 
+/**
+ * Default {@link org.apache.camel.spi.ExecutorServiceManager}.
+ */
 public class DefaultExecutorServiceManager extends BaseExecutorServiceManager {
 
     public DefaultExecutorServiceManager(CamelContext camelContext) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
index f9995a0..59de11f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.impl.DefaultExecutorServiceManager;
 import org.apache.camel.support.DefaultThreadPoolFactory;
 import org.junit.Test;
 
@@ -36,7 +37,7 @@ public class CustomThreadPoolFactoryTest extends ContextTestSupport {
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
-        BaseExecutorServiceManager executorServiceManager = new BaseExecutorServiceManager(context);
+        DefaultExecutorServiceManager executorServiceManager = new DefaultExecutorServiceManager(context);
         executorServiceManager.setThreadPoolFactory(factory);
         context.setExecutorServiceManager(executorServiceManager);
         return context;


[camel] 08/08: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 518da60891cb33edae76746e3311e1ffe43e547c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 15:35:37 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/impl/engine/AbstractCamelContext.java    |  2 +-
 .../camel/impl/engine/DefaultReactiveExecutor.java | 47 ++++++++++++++-
 .../camel/impl/MultipleLifecycleStrategyTest.java  |  2 +-
 core/camel-management-impl/pom.xml                 | 16 +++++
 .../ManagedDefaultReactiveExecutorTest.java        | 70 ++++++++++++++++++++++
 .../management/ManagedNonManagedServiceTest.java   |  4 +-
 ...edProducerRouteAddRemoveRegisterAlwaysTest.java |  2 +-
 .../management/ManagedRouteAddRemoveTest.java      |  2 +-
 8 files changed, 137 insertions(+), 8 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 20eadae..de47b59 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -3142,6 +3142,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getProducerServicePool();
         getPollingConsumerServicePool();
         getRestRegistryFactory();
+        getReactiveExecutor();
 
         if (isTypeConverterStatisticsEnabled() != null) {
             getTypeConverterRegistry().getStatistics().setStatisticsEnabled(isTypeConverterStatisticsEnabled());
@@ -3170,7 +3171,6 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getBeanProxyFactory();
         getBeanProcessorFactory();
         getBeanPostProcessor();
-        getReactiveExecutor();
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index e094999..350e189 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -17,9 +17,14 @@
 package org.apache.camel.impl.engine;
 
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.StaticService;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
@@ -28,13 +33,23 @@ import org.slf4j.LoggerFactory;
 /**
  * Default {@link ReactiveExecutor}.
  */
+@ManagedResource(description = "Managed ReactiveExecutor")
 public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService {
 
-    // TODO: Add mbean info so we can get details
-
     private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class);
 
-    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new);
+    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(new Supplier<Worker>() {
+        @Override
+        public Worker get() {
+            createdWorkers.incrementAndGet();
+            return new Worker(DefaultReactiveExecutor.this);
+        }
+    });
+
+    // use for statistics so we have insights at runtime
+    private final AtomicInteger createdWorkers = new AtomicInteger();
+    private final AtomicInteger runningWorkers = new AtomicInteger();
+    private final AtomicLong pendingTasks = new AtomicLong();
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
@@ -65,6 +80,21 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
         return workers.get().executeFromQueue();
     }
 
+    @ManagedAttribute(description = "Number of created workers")
+    public int getCreatedWorkers() {
+        return createdWorkers.get();
+    }
+
+    @ManagedAttribute(description = "Number of running workers")
+    public int getRunningWorkers() {
+        return runningWorkers.get();
+    }
+
+    @ManagedAttribute(description = "Number of pending tasks")
+    public long getPendingTasks() {
+        return pendingTasks.get();
+    }
+
     @Override
     public void callback(AsyncCallback callback) {
         schedule(new Runnable() {
@@ -104,10 +134,15 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
 
     private static class Worker {
 
+        private final DefaultReactiveExecutor executor;
         private volatile LinkedList<Runnable> queue = new LinkedList<>();
         private volatile LinkedList<LinkedList<Runnable>> back;
         private volatile boolean running;
 
+        public Worker(DefaultReactiveExecutor executor) {
+            this.executor = executor;
+        }
+
         void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
             if (main) {
                 if (!queue.isEmpty()) {
@@ -120,11 +155,14 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
             }
             if (first) {
                 queue.addFirst(runnable);
+                executor.pendingTasks.incrementAndGet();
             } else {
                 queue.addLast(runnable);
+                executor.pendingTasks.incrementAndGet();
             }
             if (!running || sync) {
                 running = true;
+                executor.runningWorkers.incrementAndGet();
 //                Thread thread = Thread.currentThread();
 //                String name = thread.getName();
                 try {
@@ -139,6 +177,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                             }
                         }
                         try {
+                            executor.pendingTasks.decrementAndGet();
 //                            thread.setName(name + " - " + polled.toString());
                             polled.run();
                         } catch (Throwable t) {
@@ -148,6 +187,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                 } finally {
 //                    thread.setName(name);
                     running = false;
+                    executor.runningWorkers.decrementAndGet();
                 }
             } else {
                 LOG.debug("Queuing reactive work: {}", runnable);
@@ -162,6 +202,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
             Thread thread = Thread.currentThread();
             String name = thread.getName();
             try {
+                executor.pendingTasks.decrementAndGet();
                 thread.setName(name + " - " + polled.toString());
                 polled.run();
             } catch (Throwable t) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
index caffd55..4c0ffc8 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
@@ -52,7 +52,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport {
         List<String> expectedEvents = Arrays.asList("onContextStart",
             "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
             "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
-            "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
+            "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
         
         assertEquals(expectedEvents, dummy1.getEvents());
         assertEquals(expectedEvents, dummy2.getEvents());
diff --git a/core/camel-management-impl/pom.xml b/core/camel-management-impl/pom.xml
index 3d6fef6..58aca2f 100644
--- a/core/camel-management-impl/pom.xml
+++ b/core/camel-management-impl/pom.xml
@@ -109,6 +109,22 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <!-- logging for testing -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
new file mode 100644
index 0000000..a6cb317
--- /dev/null
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class ManagedDefaultReactiveExecutorTest extends ManagementTestSupport {
+
+    @Test
+    public void testReactiveExecutor() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                    .to("log:foo")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // check mbeans
+                            MBeanServer mbeanServer = getMBeanServer();
+
+                            ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultReactiveExecutor");
+                            assertTrue("Should be registered", mbeanServer.isRegistered(on));
+
+                            // should be 1 running
+                            Integer running = (Integer) mbeanServer.getAttribute(on, "RunningWorkers");
+                            assertEquals(1, running.intValue());
+
+                            // should be 0 pending
+                            Long pending = (Long) mbeanServer.getAttribute(on, "PendingTasks");
+                            assertEquals(0, pending.intValue());
+                        }
+                    })
+                    .to("log:bar")
+                    .to("mock:result");
+            }
+        };
+    }
+
+
+}
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
index 6962165..3a249b5 100644
--- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 
 public class ManagedNonManagedServiceTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Test
     public void testService() throws Exception {
@@ -38,6 +38,8 @@ public class ManagedNonManagedServiceTest extends ManagementTestSupport {
             return;
         }
 
+        template.sendBody("direct:start", "Hello World");
+
         // must enable always as CamelContext has been started
         // and we add the service manually below
         context.getManagementStrategy().getManagementAgent().setRegisterAlways(true);
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index 6c7cd92..4bb974c 100644
--- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementTestSupport {
 
-    private int services = 11;
+    private int services = 12;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index fff6e2d..339ec5f 100644
--- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -33,7 +33,7 @@ import org.junit.Test;
  */
 public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
     
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {