You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/06/20 06:14:30 UTC

[camel] branch master updated (ff8aabb -> 06beeba)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from ff8aabb  Revert ""CAMEL-13660:xml-specs - Can we upgrade stax2-api from 3.x to 4.x""
     new 87af297  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new e1b778d  camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 3e324d4  camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new f96f3ab  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new d6a47b2  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 7c05330  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 0857c76  BaseExecutorServiceManager should be abstract
     new 35805f9  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new c926451  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 344d548  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 26a5457  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 611590e  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 86d23b5  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 88942d0  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new cb3ceba  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new dee4f9b  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 0a181b3  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
     new 06beeba  CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

The 18 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 apache-camel/pom.xml                               |  16 +-
 apache-camel/src/main/descriptors/common-bin.xml   |   3 +
 bom/camel-bom/pom.xml                              |   5 +
 .../camel-reactive-executor-vertx}/pom.xml         |  29 ++-
 .../src/main/docs/reactive-executor-vertx.adoc     |  46 +++++
 .../reactive/vertx/VertXReactiveExecutor.java      | 117 +++++++++++
 .../services/org/apache/camel/reactive-executor    |  18 ++
 .../org/apache/camel/reactive/SimpleMockTest.java  |  65 ++++++
 .../src/test/resources/log4j2.properties           |  31 +++
 components/pom.xml                                 |   1 +
 components/readme.adoc                             |   4 +-
 .../main/java/org/apache/camel/CamelContext.java   |   8 +
 .../org/apache/camel/spi/ReactiveExecutor.java     | 104 ++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  38 +++-
 .../impl/engine/BaseExecutorServiceManager.java    |   5 +-
 .../engine/DefaultAsyncProcessorAwaitManager.java  |   4 +-
 .../camel/impl/engine/DefaultReactiveExecutor.java | 228 +++++++++++++++++++++
 .../impl/engine/ReactiveExecutorResolver.java      |  73 +++++++
 .../camel/processor/CamelInternalProcessor.java    |  11 +-
 .../org/apache/camel/processor/LoopProcessor.java  |   7 +-
 .../apache/camel/processor/MulticastProcessor.java |  13 +-
 .../java/org/apache/camel/processor/Pipeline.java  |   9 +-
 .../processor/SharedCamelInternalProcessor.java    |  13 +-
 .../org/apache/camel/processor/TryProcessor.java   |   5 +-
 .../processor/aggregate/AggregateProcessor.java    |   3 +-
 .../errorhandler/RedeliveryErrorHandler.java       |  27 ++-
 .../loadbalancer/FailOverLoadBalancer.java         |   5 +-
 .../processor/loadbalancer/TopicLoadBalancer.java  |   5 +-
 core/camel-caffeine-lrucache/pom.xml               |   2 +-
 .../core/xml/AbstractCamelContextFactoryBean.java  |   6 +
 .../org/apache/camel/impl/DefaultCamelContext.java |   7 +
 .../camel/impl/DefaultExecutorServiceManager.java  |   3 +
 .../camel/impl/MultipleLifecycleStrategyTest.java  |   2 +-
 .../impl/engine/CustomThreadPoolFactoryTest.java   |   3 +-
 core/camel-headersmap/pom.xml                      |   2 +-
 .../camel-headersmap/src/main/docs/headersmap.adoc |   4 +-
 core/camel-jaxp/pom.xml                            |   6 +-
 .../camel/main/DefaultConfigurationConfigurer.java |  44 ++--
 core/camel-management-impl/pom.xml                 |  16 ++
 .../ManagedDefaultReactiveExecutorTest.java        |  70 +++++++
 .../management/ManagedNonManagedServiceTest.java   |   4 +-
 ...edProducerRouteAddRemoveRegisterAlwaysTest.java |   2 +-
 .../management/ManagedRouteAddRemoveTest.java      |   2 +-
 .../org/apache/camel/support/ReactiveHelper.java   |  19 +-
 docs/components/modules/ROOT/nav.adoc              |   1 +
 .../ROOT/pages/reactive-executor-vertx.adoc        |  46 +++++
 examples/README.adoc                               |   2 +
 .../camel-example-reactive-executor-vertx}/pom.xml |  65 +++---
 .../readme.adoc                                    |  15 ++
 .../org/apache/camel/example/MyApplication.java    |  38 ++++
 .../org/apache/camel/example/MyRouteBuilder.java   |  30 +++
 .../src/main/resources/application.properties      |  28 +++
 .../src/main}/resources/log4j2.properties          |   0
 examples/pom.xml                                   |   1 +
 parent/pom.xml                                     |  10 +
 .../camel-reactive-executor-vertx-starter/pom.xml  |  53 +++++
 .../src/main/resources/META-INF/LICENSE.txt        |   0
 .../src/main/resources/META-INF/NOTICE.txt         |   0
 .../src/main/resources/META-INF/spring.provides    |  17 ++
 platforms/spring-boot/components-starter/pom.xml   |   1 +
 .../camel-spring-boot-dependencies/pom.xml         |   5 +
 61 files changed, 1252 insertions(+), 145 deletions(-)
 copy {core/camel-headersmap => components/camel-reactive-executor-vertx}/pom.xml (79%)
 create mode 100644 components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
 create mode 100644 components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
 create mode 100644 components/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
 create mode 100644 components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
 create mode 100644 components/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
 create mode 100644 core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
 create mode 100644 core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
 create mode 100644 core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
 create mode 100644 core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
 create mode 100644 docs/components/modules/ROOT/pages/reactive-executor-vertx.adoc
 copy {core/camel-headersmap => examples/camel-example-reactive-executor-vertx}/pom.xml (61%)
 create mode 100644 examples/camel-example-reactive-executor-vertx/readme.adoc
 create mode 100644 examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyApplication.java
 create mode 100644 examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyRouteBuilder.java
 create mode 100644 examples/camel-example-reactive-executor-vertx/src/main/resources/application.properties
 copy {tests/camel-itest-osgi/src/test => examples/camel-example-reactive-executor-vertx/src/main}/resources/log4j2.properties (100%)
 create mode 100644 platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/pom.xml
 copy {tooling/maven/camel-package-maven-plugin => platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter}/src/main/resources/META-INF/LICENSE.txt (100%)
 copy {tooling/maven/camel-package-maven-plugin => platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter}/src/main/resources/META-INF/NOTICE.txt (100%)
 create mode 100644 platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/spring.provides


[camel] 01/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 87af297b929247891b9896e310d9a812300a7696
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 12:44:59 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../java/org/apache/camel/support/ReactiveHelper.java  | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
index 7e05636..06db5e6 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -22,6 +22,9 @@ import org.apache.camel.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * A basic reactive engine that uses a worker pool to process tasks.
+ */
 public final class ReactiveHelper {
 
     private static final ThreadLocal<Worker> WORKERS = ThreadLocal.withInitial(Worker::new);
@@ -51,6 +54,10 @@ public final class ReactiveHelper {
         WORKERS.get().schedule(describe(runnable, description), true, false, false);
     }
 
+    /**
+     * @deprecated not in use
+     */
+    @Deprecated
     public static void scheduleLast(Runnable runnable, String description) {
         WORKERS.get().schedule(describe(runnable, description), false, false, false);
     }
@@ -91,9 +98,9 @@ public final class ReactiveHelper {
 
     private static class Worker {
 
-        LinkedList<Runnable> queue = new LinkedList<>();
-        LinkedList<LinkedList<Runnable>> back;
-        boolean running;
+        private volatile LinkedList<Runnable> queue = new LinkedList<>();
+        private volatile LinkedList<LinkedList<Runnable>> back;
+        private volatile boolean running;
 
         public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
             if (main) {
@@ -129,7 +136,7 @@ public final class ReactiveHelper {
 //                            thread.setName(name + " - " + polled.toString());
                             polled.run();
                         } catch (Throwable t) {
-                            t.printStackTrace();
+                            LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
                         }
                     }
                 } finally {
@@ -152,7 +159,8 @@ public final class ReactiveHelper {
                 thread.setName(name + " - " + polled.toString());
                 polled.run();
             } catch (Throwable t) {
-                t.printStackTrace();
+                // should not happen
+                LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
             } finally {
                 thread.setName(name);
             }


[camel] 03/18: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3e324d4eae093c00ba390ece7c8d08a8e34bfb99
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 13:01:12 2019 +0200

    camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     |  4 ++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  3 +--
 .../camel/impl/engine/DefaultReactiveExecutor.java |  6 +++++
 .../camel/processor/CamelInternalProcessor.java    |  5 ++--
 .../org/apache/camel/processor/LoopProcessor.java  |  7 +++---
 .../apache/camel/processor/MulticastProcessor.java | 13 +++++------
 .../java/org/apache/camel/processor/Pipeline.java  |  9 ++++----
 .../processor/SharedCamelInternalProcessor.java    |  5 ++--
 .../org/apache/camel/processor/TryProcessor.java   |  5 ++--
 .../processor/aggregate/AggregateProcessor.java    |  3 +--
 .../errorhandler/RedeliveryErrorHandler.java       | 27 +++++++++++-----------
 .../loadbalancer/FailOverLoadBalancer.java         |  5 ++--
 .../processor/loadbalancer/TopicLoadBalancer.java  |  5 ++--
 13 files changed, 48 insertions(+), 49 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 8987bd3..4a21127 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.AsyncCallback;
+
 /**
  * SPI to plugin different reactive engines in the Camel routing engine.
  */
@@ -38,4 +40,6 @@ public interface ReactiveExecutor {
 
     boolean executeFromQueue();
 
+    void callback(AsyncCallback callback);
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 3a2b41c..8087942 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -34,7 +34,6 @@ import org.apache.camel.StaticService;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DefaultExchangeFormatter;
 import org.apache.camel.support.service.ServiceSupport;
 
@@ -88,7 +87,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
             if (latch.getCount() <= 0) {
                 return;
             }
-        } while (ReactiveHelper.executeFromQueue());
+        } while (exchange.getContext().getReactiveExecutor().executeFromQueue());
         log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 6a9473b..d700e3c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.support.ReactiveHelper;
 
@@ -58,4 +59,9 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
     public boolean executeFromQueue() {
         return ReactiveHelper.executeFromQueue();
     }
+
+    @Override
+    public void callback(AsyncCallback callback) {
+        ReactiveHelper.callback(callback);
+    }
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 27faa0b..f039f27 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -48,7 +48,6 @@ import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.apache.camel.util.StopWatch;
@@ -175,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                ReactiveHelper.callback(ocallback);
+                exchange.getContext().getReactiveExecutor().callback(ocallback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
@@ -225,7 +224,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            ReactiveHelper.schedule(() -> {
+            exchange.getContext().getReactiveExecutor().schedule(() -> {
                 // execute any after processor work (in current thread, not in the callback)
                 if (uow != null) {
                     uow.afterProcess(processor, exchange, callback, false);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 75329bc..1628c85 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -25,7 +25,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -54,9 +53,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
             LoopState state = new LoopState(exchange, callback);
 
             if (exchange.isTransacted()) {
-                ReactiveHelper.scheduleSync(state);
+                exchange.getContext().getReactiveExecutor().scheduleSync(state);
             } else {
-                ReactiveHelper.scheduleMain(state);
+                exchange.getContext().getReactiveExecutor().scheduleMain(state);
             }
             return false;
 
@@ -113,7 +112,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                     processor.process(current, doneSync -> {
                         // increment counter after done
                         index++;
-                        ReactiveHelper.schedule(this);
+                        exchange.getContext().getReactiveExecutor().schedule(this);
                     });
                 } else {
                     // we are done so prepare the result
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 594c936..4d0806f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -56,7 +56,6 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
@@ -220,12 +219,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
         MulticastState state = new MulticastState(exchange, pairs, callback);
         if (isParallelProcessing()) {
-            executorService.submit(() -> ReactiveHelper.schedule(state));
+            executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state));
         } else {
             if (exchange.isTransacted()) {
-                ReactiveHelper.scheduleSync(state);
+                exchange.getContext().getReactiveExecutor().scheduleSync(state);
             } else {
-                ReactiveHelper.scheduleMain(state);
+                exchange.getContext().getReactiveExecutor().scheduleMain(state);
             }
         }
 
@@ -237,9 +236,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
     protected void schedule(Runnable runnable) {
         if (isParallelProcessing()) {
-            executorService.submit(() -> ReactiveHelper.schedule(runnable));
+            executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable));
         } else {
-            ReactiveHelper.schedule(runnable, "Multicast next step");
+            camelContext.getReactiveExecutor().schedule(runnable, "Multicast next step");
         }
     }
 
@@ -524,7 +523,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
             original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
         }
 
-        ReactiveHelper.callback(callback);
+        camelContext.getReactiveExecutor().callback(callback);
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 077c8de..9ffe248 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -33,7 +33,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -82,10 +81,10 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+            camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
                     "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
         } else {
-            ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+            camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
                     "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
         }
         return false;
@@ -105,7 +104,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             AsyncProcessor processor = processors.next();
 
             processor.process(exchange, doneSync ->
-                    ReactiveHelper.schedule(() -> doProcess(exchange, callback, processors, false),
+                    camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false),
                             "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
@@ -115,7 +114,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
             log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
 
-            ReactiveHelper.callback(callback);
+            camelContext.getReactiveExecutor().callback(callback);
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 0cf2654..1a5e92d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -36,7 +36,6 @@ import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -192,7 +191,7 @@ public class SharedCamelInternalProcessor {
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            ReactiveHelper.schedule(() -> {
+            exchange.getContext().getReactiveExecutor().schedule(() -> {
                 // execute any after processor work (in current thread, not in the callback)
                 if (uow != null) {
                     uow.afterProcess(processor, exchange, callback, sync);
@@ -255,7 +254,7 @@ public class SharedCamelInternalProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                ReactiveHelper.callback(callback);
+                exchange.getContext().getReactiveExecutor().callback(callback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index 264ba2b..9b1d50d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -30,7 +30,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 
 /**
@@ -61,7 +60,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
 
-        ReactiveHelper.schedule(new TryState(exchange, callback));
+        exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback));
         return false;
     }
 
@@ -90,7 +89,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
                 Processor processor = processors.next();
                 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
                 log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-                async.process(exchange, doneSync -> ReactiveHelper.schedule(this));
+                async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this));
             } else {
                 ExchangeHelper.prepareOutToIn(exchange);
                 exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 2ccde2e..f4508dd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -64,7 +64,6 @@ import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.LRUCacheFactory;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.NoLock;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
@@ -770,7 +769,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         // send this exchange
         // the call to schedule last if needed to ensure in-order processing of the aggregates
-        executorService.submit(() -> ReactiveHelper.scheduleSync(() -> processor.process(exchange, done -> {
+        executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> {
             // log exception if there was a problem
             if (exchange.getException() != null) {
                 // if there was an exception then let the exception handler handle it
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 782d8a7..2628c67 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -48,7 +48,6 @@ import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DefaultExchangeFormatter;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -153,9 +152,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         RedeliveryState state = new RedeliveryState(exchange, callback);
         // Run it
         if (exchange.isTransacted()) {
-            ReactiveHelper.scheduleSync(state);
+            camelContext.getReactiveExecutor().scheduleSync(state);
         } else {
-            ReactiveHelper.scheduleMain(state);
+            camelContext.getReactiveExecutor().scheduleMain(state);
         }
         return false;
     }
@@ -442,7 +441,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         if (log.isTraceEnabled()) {
                             log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
                         }
-                        executorService.schedule(() -> ReactiveHelper.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
+                        executorService.schedule(() -> camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
 
                     } else {
                         // async delayed redelivery was disabled or we are transacted so we must be synchronous
@@ -458,9 +457,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                                 // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
                                 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
                                 // jump to start of loop which then detects that we are failed and exhausted
-                                ReactiveHelper.schedule(this);
+                                camelContext.getReactiveExecutor().schedule(this);
                             } else {
-                                ReactiveHelper.schedule(this::redeliver);
+                                camelContext.getReactiveExecutor().schedule(this::redeliver);
                             }
                         } catch (InterruptedException e) {
                             redeliverySleepCounter.decrementAndGet();
@@ -469,12 +468,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                             // mark the exchange to stop continue routing when interrupted
                             // as we do not want to continue routing (for example a task has been cancelled)
                             exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
-                            ReactiveHelper.callback(callback);
+                            camelContext.getReactiveExecutor().callback(callback);
                         }
                     }
                 } else {
                     // execute the task immediately
-                    ReactiveHelper.schedule(this::redeliver);
+                    camelContext.getReactiveExecutor().schedule(this::redeliver);
                 }
             } else {
                 // Simple delivery
@@ -482,10 +481,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
                     if (isDone(exchange)) {
-                        ReactiveHelper.callback(callback);
+                        camelContext.getReactiveExecutor().callback(callback);
                     } else {
                         // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                        ReactiveHelper.schedule(this);
+                        camelContext.getReactiveExecutor().schedule(this);
                     }
                 });
             }
@@ -563,11 +562,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    ReactiveHelper.callback(callback);
+                    camelContext.getReactiveExecutor().callback(callback);
                     return;
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                    ReactiveHelper.schedule(this);
+                    camelContext.getReactiveExecutor().schedule(this);
                 }
             });
         }
@@ -845,7 +844,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
                     } finally {
                         // if the fault was handled asynchronously, this should be reflected in the callback as well
-                        ReactiveHelper.callback(callback);
+                        camelContext.getReactiveExecutor().callback(callback);
                     }
                 });
             } else {
@@ -864,7 +863,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
                 } finally {
                     // callback we are done
-                    ReactiveHelper.callback(callback);
+                    camelContext.getReactiveExecutor().callback(callback);
                 }
             }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 059c795..89a6f91 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -27,7 +27,6 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Traceable;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -159,7 +158,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -246,7 +245,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
 
             // process the exchange
             log.debug("Processing failover at attempt {} for {}", attempts, copy);
-            processor.process(copy, doneSync -> ReactiveHelper.schedule(this::run));
+            processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run));
         }
 
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
index 09ba098..ebd0f87 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.support.ReactiveHelper;
 
 /**
  * A {@link LoadBalancer} implementations which sends to all destinations
@@ -33,7 +32,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -64,7 +63,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
                 exchange.setException(current.getException());
                 callback.done(false);
             } else {
-                ReactiveHelper.schedule(this::run);
+                exchange.getContext().getReactiveExecutor().schedule(this::run);
             }
         }
     }


[camel] 14/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 88942d066f6d19e49a464fa7c461daa42c16d5f8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 11:04:29 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel-headersmap/src/main/docs/headersmap.adoc |  4 +-
 core/camel-reactive-executor-vertx/pom.xml         |  2 +
 .../src/main/docs/reactive-executor-vertx.adoc     | 44 ++++++++++++++++++++++
 .../reactive/vertx/VertXReactiveExecutor.java      | 25 ++++++++++--
 examples/README.adoc                               |  2 +
 examples/camel-example-main/pom.xml                |  4 --
 .../pom.xml                                        | 13 ++-----
 .../readme.adoc                                    | 15 ++++++++
 .../org/apache/camel/example/MyApplication.java    | 38 +++++++++++++++++++
 .../org/apache/camel/example/MyRouteBuilder.java   | 30 +++++++++++++++
 .../src/main/resources/application.properties      | 28 ++++++++++++++
 .../src/main/resources/log4j2.properties           | 23 +++++++++++
 examples/pom.xml                                   |  1 +
 13 files changed, 209 insertions(+), 20 deletions(-)

diff --git a/core/camel-headersmap/src/main/docs/headersmap.adoc b/core/camel-headersmap/src/main/docs/headersmap.adoc
index 6947bc1..b7d8759 100644
--- a/core/camel-headersmap/src/main/docs/headersmap.adoc
+++ b/core/camel-headersmap/src/main/docs/headersmap.adoc
@@ -15,11 +15,9 @@ and Camel should auto-detect this on startup and log as follows:
 Detected and using custom HeadersMapFactory: org.apache.camel.component.headersmap.FastHeadersMapFactory@71e9ebae
 ----
 
-For spring-boot there is a `camel-headersmap-starter` dependency you should use.
-
 === Manual enabling
 
-If you use OSGi or the implementation is not added to the classpath, you need to enable this explict such:
+If you use OSGi or the implementation is not added to the classpath, you need to enable this explicit such:
 
 [source,java]
 ----
diff --git a/core/camel-reactive-executor-vertx/pom.xml b/core/camel-reactive-executor-vertx/pom.xml
index 0ae3475..d76289a 100644
--- a/core/camel-reactive-executor-vertx/pom.xml
+++ b/core/camel-reactive-executor-vertx/pom.xml
@@ -27,6 +27,8 @@
         <version>3.0.0-SNAPSHOT</version>
     </parent>
 
+    <!-- TODO: move to components -->
+
     <artifactId>camel-reactive-executor-vertx</artifactId>
     <packaging>jar</packaging>
     <name>Camel :: Reactive Executor :: Vert X</name>
diff --git a/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc b/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
new file mode 100644
index 0000000..e541df4
--- /dev/null
+++ b/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
@@ -0,0 +1,44 @@
+== ReactiveExecutor VertX
+
+*Available as of Camel 3.0*
+
+The camel-reactive-executor-vertx is a VertX based implementation of the `ReactiveExecutor` SPI.
+
+By default Camel uses its own reactive engine for routing messages, but you can plugin
+different engines via a SPI interface. This is a VertX based plugin that uses the VertX event loop
+for processing message during routing.
+
+=== VertX instance
+
+This implementation will by default create a default `io.vertx.core.Vertx` instance to be used.
+However you can configure an existing instance using the getter/setter on the `VertXReactiveExecutor` class.
+
+=== Auto detection from classpath
+
+To use this implementation all you need to do is to add the `camel-reactive-executor-vertx` dependency to the classpath,
+and Camel should auto-detect this on startup and log as follows:
+
+[source,text]
+----
+Using ReactiveExecutor: org.apache.camel.reactive.vertx.VertXReactiveExecutor@2a62b5bc
+----
+
+=== Manual enabling
+
+If you use OSGi or the implementation is not added to the classpath, you need to enable this explict such:
+
+[source,java]
+----
+CamelContext camel = ...
+
+camel.setReactiveExecutor(new VertXReactiveExecutor());
+----
+
+Or in XML DSL (spring or blueprint XML file) you can declare the factory as a `<bean>`:
+
+[source,xml]
+----
+<bean id="vertxReactiveExecutor" class="org.apache.camel.reactive.vertx.VertXReactiveExecutor"/>
+----
+
+and then Camel should detect the bean and use the reactive executor.
\ No newline at end of file
diff --git a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 922b4b5..38a4262 100644
--- a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -35,6 +35,18 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe
     private static final Logger LOG = LoggerFactory.getLogger(VertXReactiveExecutor.class);
 
     private Vertx vertx;
+    private boolean shouldClose;
+
+    public Vertx getVertx() {
+        return vertx;
+    }
+
+    /**
+     * To use an existing instance of {@link Vertx} instead of creating a default instance.
+     */
+    public void setVertx(Vertx vertx) {
+        this.vertx = vertx;
+    }
 
     @Override
     public void schedule(Runnable runnable, String description) {
@@ -88,13 +100,18 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe
 
     @Override
     protected void doStart() throws Exception {
-        LOG.debug("Starting VertX");
-        vertx = Vertx.vertx();
+        if (vertx == null) {
+            LOG.debug("Starting VertX");
+            shouldClose = true;
+            vertx = Vertx.vertx();
+        }
     }
 
     @Override
     protected void doStop() throws Exception {
-        LOG.debug("Stopping VertX");
-        vertx.close();
+        if (vertx != null && shouldClose) {
+            LOG.debug("Stopping VertX");
+            vertx.close();
+        }
     }
 }
diff --git a/examples/README.adoc b/examples/README.adoc
index fcb112d0..0edaf62 100644
--- a/examples/README.adoc
+++ b/examples/README.adoc
@@ -221,6 +221,8 @@ Number of Examples: 111 (0 deprecated)
 
 | link:camel-example-kotlin/ReadMe.md[Kotlin] (camel-example-kotlin) | Other Languages | A Camel route using Kotlin
 
+| link:camel-example-reactive-executor-vertx/readme.adoc[Reactive Executor Vertx] (camel-example-reactive-executor-vertx) | Reactive | An example for showing using VertX as reactive executor with standalone Camel
+
 | link:camel-example-reactive-streams/readme.adoc[Reactive Streams] (camel-example-reactive-streams) | Reactive | An example that shows how Camel can exchange data using reactive streams with Spring Boot reactor
     
 
diff --git a/examples/camel-example-main/pom.xml b/examples/camel-example-main/pom.xml
index d33e285..43d1ee8 100644
--- a/examples/camel-example-main/pom.xml
+++ b/examples/camel-example-main/pom.xml
@@ -51,10 +51,6 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-quartz2</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-reactive-executor-vertx</artifactId>
-        </dependency>
 
         <!-- logging -->
         <dependency>
diff --git a/examples/camel-example-main/pom.xml b/examples/camel-example-reactive-executor-vertx/pom.xml
similarity index 89%
copy from examples/camel-example-main/pom.xml
copy to examples/camel-example-reactive-executor-vertx/pom.xml
index d33e285..ce4ecf4 100644
--- a/examples/camel-example-main/pom.xml
+++ b/examples/camel-example-reactive-executor-vertx/pom.xml
@@ -28,13 +28,13 @@
         <version>3.0.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>camel-example-main</artifactId>
+    <artifactId>camel-example-reactive-executor-vertx</artifactId>
     <packaging>jar</packaging>
-    <name>Camel :: Example :: Main</name>
-    <description>An example for showing standalone Camel</description>
+    <name>Camel :: Example :: Reactive Executor :: VertX</name>
+    <description>An example for showing using VertX as reactive executor with standalone Camel</description>
 
     <properties>
-        <category>Beginner</category>
+        <category>Reactive</category>
     </properties>
 
     <dependencies>
@@ -49,10 +49,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-quartz2</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
             <artifactId>camel-reactive-executor-vertx</artifactId>
         </dependency>
 
@@ -91,7 +87,6 @@
                     <mainClass>org.apache.camel.example.MyApplication</mainClass>
                 </configuration>
             </plugin>
-
         </plugins>
     </build>
 
diff --git a/examples/camel-example-reactive-executor-vertx/readme.adoc b/examples/camel-example-reactive-executor-vertx/readme.adoc
new file mode 100644
index 0000000..49af783
--- /dev/null
+++ b/examples/camel-example-reactive-executor-vertx/readme.adoc
@@ -0,0 +1,15 @@
+== Camel Example Reactive Executor VertX
+
+This example uses VertX as the reactive executor for routing messages with Camel.
+By default Camel uses its own reactive engine for routing messages, but you can plugin
+different engines via a SPI interface. This example uses VertX as the engine.
+
+=== How to run
+
+You can run this example using
+
+    mvn camel:run   
+
+=== More information
+
+You can find more information about Apache Camel at the website: http://camel.apache.org/
diff --git a/examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyApplication.java b/examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyApplication.java
new file mode 100644
index 0000000..8c1cc6a
--- /dev/null
+++ b/examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyApplication.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import org.apache.camel.main.Main;
+
+/**
+ * Main class that boot the Camel application
+ */
+public final class MyApplication {
+
+    private MyApplication() {
+    }
+
+    public static void main(String[] args) throws Exception {
+        // use Camels Main class
+        Main main = new Main();
+        // and add the routes (you can specify multiple classes)
+        main.addRouteBuilder(MyRouteBuilder.class);
+        // now keep the application running until the JVM is terminated (ctrl + c or sigterm)
+        main.run(args);
+    }
+
+}
diff --git a/examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyRouteBuilder.java b/examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyRouteBuilder.java
new file mode 100644
index 0000000..003edb5
--- /dev/null
+++ b/examples/camel-example-reactive-executor-vertx/src/main/java/org/apache/camel/example/MyRouteBuilder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class MyRouteBuilder extends RouteBuilder {
+
+    @Override
+    public void configure() throws Exception {
+        from("timer:foo?period=2s")
+            .setBody().constant("Hello World")
+            .delay(simple("${random(0,1000)}"))
+            .log("${body}");
+    }
+}
diff --git a/examples/camel-example-reactive-executor-vertx/src/main/resources/application.properties b/examples/camel-example-reactive-executor-vertx/src/main/resources/application.properties
new file mode 100644
index 0000000..0afb55c
--- /dev/null
+++ b/examples/camel-example-reactive-executor-vertx/src/main/resources/application.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# to configure camel main
+# here you can configure options on camel main (see MainConfigurationProperties class)
+camel.main.name = MyVertXCamel
+camel.main.jmx-enabled = false
+
+# you can also configure camel context directly
+# camel.context.shutdown-strategy.shutdown-now-on-timeout = false
+
+# application properties
+hi = Hello
+
diff --git a/examples/camel-example-reactive-executor-vertx/src/main/resources/log4j2.properties b/examples/camel-example-reactive-executor-vertx/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..d406a9f
--- /dev/null
+++ b/examples/camel-example-reactive-executor-vertx/src/main/resources/log4j2.properties
@@ -0,0 +1,23 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.out.ref = out
diff --git a/examples/pom.xml b/examples/pom.xml
index 983a3d1..e81d72e 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -94,6 +94,7 @@
         <module>camel-example-opentracing</module>
         <module>camel-example-pojo-messaging</module>
         <module>camel-example-rabbitmq</module>
+        <module>camel-example-reactive-executor-vertx</module>
         <module>camel-example-reactive-streams</module>
         <module>camel-example-rest-producer</module>
         <module>camel-example-rest-swagger</module>


[camel] 06/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7c053303fa9794e54566ffedbc8a7e97d5085536
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 14:26:37 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     | 32 +++++++++++---
 .../camel/impl/engine/AbstractCamelContext.java    |  7 +++-
 .../camel/impl/engine/DefaultReactiveExecutor.java | 49 ++++++++++++----------
 .../camel/processor/CamelInternalProcessor.java    |  8 ++--
 4 files changed, 62 insertions(+), 34 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 4a21127..37744fb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -26,20 +26,40 @@ public interface ReactiveExecutor {
     // TODO: Add javadoc
     // TODO: Better name
 
-    void scheduleMain(Runnable runnable);
+    default void schedule(Runnable runnable) {
+        schedule(runnable, null);
+    }
 
-    void scheduleSync(Runnable runnable);
+    void schedule(Runnable runnable, String description);
 
-    void scheduleMain(Runnable runnable, String description);
+    default void scheduleMain(Runnable runnable) {
+        scheduleMain(runnable, null);
+    }
 
-    void schedule(Runnable runnable);
+    void scheduleMain(Runnable runnable, String description);
 
-    void schedule(Runnable runnable, String description);
+    default void scheduleSync(Runnable runnable) {
+        scheduleSync(runnable, null);
+    }
 
     void scheduleSync(Runnable runnable, String description);
 
+    // TODO: Can we make this so we dont need an method on this interface as its only used once
     boolean executeFromQueue();
 
-    void callback(AsyncCallback callback);
+    default void callback(AsyncCallback callback) {
+        schedule(new Runnable() {
+
+            @Override
+            public void run() {
+                callback.done(false);
+            }
+
+            @Override
+            public String toString() {
+                return "Callback[" + callback + "]";
+            }
+        });
+    }
 
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 01f5d6e..0987b86 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2692,8 +2692,9 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
             shutdownServices(notifier);
         }
 
-        // shutdown executor service and management as the last one
+        // shutdown executor service, reactive executor and management as the last one
         shutdownServices(executorServiceManager);
+        shutdownServices(reactiveExecutor);
         shutdownServices(managementStrategy);
         shutdownServices(managementMBeanAssembler);
         shutdownServices(lifecycleStrategies);
@@ -3911,7 +3912,9 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
     }
 
     public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) {
-        this.reactiveExecutor = reactiveExecutor;
+        // special for executorServiceManager as want to stop it manually so
+        // false in stopOnShutdown
+        this.reactiveExecutor = doAddService(reactiveExecutor, false);
     }
 
     @Override
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 3ce7f0a..e094999 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -19,16 +19,17 @@ package org.apache.camel.impl.engine;
 import java.util.LinkedList;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ReactiveExecutor;
+import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ReactiveExecutor}.
  */
-public class DefaultReactiveExecutor implements ReactiveExecutor {
+public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService {
 
-    // TODO: StaticServiceSupport so we can init/start/stop
     // TODO: Add mbean info so we can get details
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class);
@@ -36,33 +37,27 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
     private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new);
 
     @Override
-    public void scheduleMain(Runnable runnable) {
-        workers.get().schedule(runnable, true, true, false);
-    }
-
-    @Override
-    public void scheduleSync(Runnable runnable) {
-        workers.get().schedule(runnable, true, true, true);
-    }
-
-    @Override
     public void scheduleMain(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), true, true, false);
-    }
-
-    @Override
-    public void schedule(Runnable runnable) {
-        workers.get().schedule(runnable, true, false, false);;
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, true, true, false);
     }
 
     @Override
     public void schedule(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), true, false, false);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, true, false, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), false, true, true);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, false, true, true);
     }
 
     @Override
@@ -97,13 +92,23 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
         };
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
     private static class Worker {
 
         private volatile LinkedList<Runnable> queue = new LinkedList<>();
         private volatile LinkedList<LinkedList<Runnable>> back;
         private volatile boolean running;
 
-        public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
+        void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
             if (main) {
                 if (!queue.isEmpty()) {
                     if (back == null) {
@@ -149,7 +154,7 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
             }
         }
 
-        public boolean executeFromQueue() {
+        boolean executeFromQueue() {
             final Runnable polled = queue != null ? queue.poll() : null;
             if (polled == null) {
                 return false;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index f039f27..3651856 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -120,7 +120,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     }
 
     @Override
-    public boolean process(Exchange exchange, AsyncCallback ocallback) {
+    public boolean process(Exchange exchange, AsyncCallback originalCallback) {
         // ----------------------------------------------------------
         // CAMEL END USER - READ ME FOR DEBUGGING TIPS
         // ----------------------------------------------------------
@@ -137,7 +137,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
         if (processor == null || !continueProcessing(exchange)) {
             // no processor or we should not continue then we are done
-            ocallback.done(true);
+            originalCallback.done(true);
             return true;
         }
 
@@ -151,7 +151,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 states[i] = state;
             } catch (Throwable e) {
                 exchange.setException(e);
-                ocallback.done(true);
+                originalCallback.done(true);
                 return true;
             }
         }
@@ -174,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                exchange.getContext().getReactiveExecutor().callback(ocallback);
+                exchange.getContext().getReactiveExecutor().callback(originalCallback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------


[camel] 13/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 86d23b581d6ef84c29b3faeefeb945eab38efe5a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 10:44:35 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     |  2 +-
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  1 -
 .../camel/impl/engine/DefaultReactiveExecutor.java |  9 +++++++
 .../processor/SharedCamelInternalProcessor.java    |  8 +++----
 .../reactive/vertx/VertXReactiveExecutor.java      | 28 +++++++++++++++++++---
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index f61b012..2a4eb9f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -75,7 +75,7 @@ public interface ReactiveExecutor {
     void scheduleSync(Runnable runnable, String description);
 
     /**
-     * Executes the next task
+     * Executes the next task (if supported by the reactive executor implementation)
      *
      * @return true if a task was executed or false if no more pending tasks
      */
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 8087942..d0e571c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -71,7 +71,6 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
      *
      * @param processor the processor
      * @param exchange  the exchange
-     * @throws Exception can be thrown if waiting is interrupted
      */
     public void process(final AsyncProcessor processor, final Exchange exchange) {
         CountDownLatch latch = new CountDownLatch(1);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 350e189..17f69e7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -144,6 +144,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
         }
 
         void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable);
+            }
             if (main) {
                 if (!queue.isEmpty()) {
                     if (back == null) {
@@ -179,6 +182,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                         try {
                             executor.pendingTasks.decrementAndGet();
 //                            thread.setName(name + " - " + polled.toString());
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Running: {}", runnable);
+                            }
                             polled.run();
                         } catch (Throwable t) {
                             LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
@@ -204,6 +210,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
             try {
                 executor.pendingTasks.decrementAndGet();
                 thread.setName(name + " - " + polled.toString());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Running: {}", polled);
+                }
                 polled.run();
             } catch (Throwable t) {
                 // should not happen
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 1a5e92d..cd4405b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -107,7 +107,7 @@ public class SharedCamelInternalProcessor {
     /**
      * Asynchronous API
      */
-    public boolean process(Exchange exchange, AsyncCallback ocallback, AsyncProcessor processor, Processor resultProcessor) {
+    public boolean process(Exchange exchange, AsyncCallback originalCallback, AsyncProcessor processor, Processor resultProcessor) {
         // ----------------------------------------------------------
         // CAMEL END USER - READ ME FOR DEBUGGING TIPS
         // ----------------------------------------------------------
@@ -124,7 +124,7 @@ public class SharedCamelInternalProcessor {
 
         if (processor == null || !continueProcessing(exchange, processor)) {
             // no processor or we should not continue then we are done
-            ocallback.done(true);
+            originalCallback.done(true);
             return true;
         }
 
@@ -138,13 +138,13 @@ public class SharedCamelInternalProcessor {
                 states[i] = state;
             } catch (Throwable e) {
                 exchange.setException(e);
-                ocallback.done(true);
+                originalCallback.done(true);
                 return true;
             }
         }
 
         // create internal callback which will execute the advices in reverse order when done
-        AsyncCallback callback = new InternalCallback(states, exchange, ocallback, resultProcessor);
+        AsyncCallback callback = new InternalCallback(states, exchange, originalCallback, resultProcessor);
 
         // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
         Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
diff --git a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 47bd7b4..922b4b5 100644
--- a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -39,31 +39,53 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe
     @Override
     public void schedule(Runnable runnable, String description) {
         LOG.trace("schedule: {}", runnable);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
         vertx.nettyEventLoopGroup().execute(runnable);
     }
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
         LOG.trace("scheduleMain: {}", runnable);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
         vertx.nettyEventLoopGroup().execute(runnable);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
         LOG.trace("scheduleSync: {}", runnable);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        final Runnable task = runnable;
         vertx.executeBlocking(future -> {
-            runnable.run();
+            task.run();
             future.complete();
         }, res -> {});
     }
 
     @Override
     public boolean executeFromQueue() {
-        LOG.trace("executeFromQueue");
-        // TODO: not implemented
+        // not supported so return false
         return false;
     }
 
+    private static Runnable describe(Runnable runnable, String description) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                runnable.run();
+            }
+            @Override
+            public String toString() {
+                return description;
+            }
+        };
+    }
+
     @Override
     protected void doStart() throws Exception {
         LOG.debug("Starting VertX");


[camel] 10/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 344d5487ce06398e09f9529439a65ad6ed04c3e6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 09:59:42 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/impl/engine/AbstractCamelContext.java    |  4 +-
 core/camel-caffeine-lrucache/pom.xml               |  2 +-
 core/camel-headersmap/pom.xml                      |  2 +-
 core/camel-jaxp/pom.xml                            |  6 +-
 .../pom.xml                                        | 22 +++----
 .../camel/reacitve/VertXReactiveExecutor.java      | 74 ++++++++++++++++++++++
 .../services/org/apache/camel/reactive-executor    | 18 ++++++
 .../org/apache/camel/reactive/SimpleMockTest.java  | 66 +++++++++++++++++++
 .../src/test/resources/log4j2.properties           | 31 +++++++++
 core/pom.xml                                       |  1 +
 10 files changed, 208 insertions(+), 18 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 46217c2..d52a395 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2503,7 +2503,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
 
         forceLazyInitialization();
 
-        // if camel-bean is on classpath then we can load its bean proxy facory
+        // if camel-bean is on classpath then we can load its bean proxy factory
         BeanProxyFactory beanProxyFactory = new BeanProxyFactoryResolver().resolve(this);
         if (beanProxyFactory != null) {
             addService(beanProxyFactory);
@@ -2581,6 +2581,8 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
                      getHeadersMapFactory());
         }
 
+        log.info("Using ReactiveExecutor: {}", getReactiveExecutor());
+
         // start routes
         if (doNotStartRoutesOnFirstStart) {
             log.debug("Skip starting routes as CamelContext has been configured with autoStartup=false");
diff --git a/core/camel-caffeine-lrucache/pom.xml b/core/camel-caffeine-lrucache/pom.xml
index 1d2beff..cbc0b9c 100644
--- a/core/camel-caffeine-lrucache/pom.xml
+++ b/core/camel-caffeine-lrucache/pom.xml
@@ -35,7 +35,7 @@
 
     <properties>
         <firstVersion>3.0</firstVersion>
-        <label>tooling</label>
+        <label>core</label>
     </properties>
 
     <dependencies>
diff --git a/core/camel-headersmap/pom.xml b/core/camel-headersmap/pom.xml
index 33b6181..3852bbd 100644
--- a/core/camel-headersmap/pom.xml
+++ b/core/camel-headersmap/pom.xml
@@ -34,7 +34,7 @@
 
     <properties>
         <firstVersion>2.20.0</firstVersion>
-        <label>tooling</label>
+        <label>core</label>
     </properties>
 
     <dependencies>
diff --git a/core/camel-jaxp/pom.xml b/core/camel-jaxp/pom.xml
index 6d8e8db..48eb4d8 100644
--- a/core/camel-jaxp/pom.xml
+++ b/core/camel-jaxp/pom.xml
@@ -30,10 +30,14 @@
 
     <artifactId>camel-jaxp</artifactId>
     <packaging>jar</packaging>
-
     <name>Camel :: JAXP</name>
     <description>Camel JAXP Support</description>
 
+    <properties>
+        <firstVersion>3.0.0</firstVersion>
+        <label>core</label>
+    </properties>
+
     <dependencies>
 
         <!-- camel annotations -->
diff --git a/core/camel-headersmap/pom.xml b/core/camel-reactive-executor-vertx/pom.xml
similarity index 81%
copy from core/camel-headersmap/pom.xml
copy to core/camel-reactive-executor-vertx/pom.xml
index 33b6181..bfa6ccd 100644
--- a/core/camel-headersmap/pom.xml
+++ b/core/camel-reactive-executor-vertx/pom.xml
@@ -27,14 +27,14 @@
         <version>3.0.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>camel-headersmap</artifactId>
+    <artifactId>camel-reactive-executor-vertx</artifactId>
     <packaging>jar</packaging>
-    <name>Camel :: Headers Map</name>
-    <description>Fast case-insensitive headers map implementation</description>
+    <name>Camel :: Reactive Executor :: Vert X</name>
+    <description>Reactive Executor for camel-core using Vert X</description>
 
     <properties>
-        <firstVersion>2.20.0</firstVersion>
-        <label>tooling</label>
+        <firstVersion>3.0.0</firstVersion>
+        <label>core</label>
     </properties>
 
     <dependencies>
@@ -45,15 +45,9 @@
         </dependency>
 
         <dependency>
-            <groupId>com.cedarsoftware</groupId>
-            <artifactId>java-util</artifactId>
-            <version>${java-util-version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-core</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-core</artifactId>
+            <version>${vertx-version}</version>
         </dependency>
 
         <!-- testing -->
diff --git a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reacitve/VertXReactiveExecutor.java b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reacitve/VertXReactiveExecutor.java
new file mode 100644
index 0000000..5e2f507
--- /dev/null
+++ b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reacitve/VertXReactiveExecutor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.reacitve;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.StaticService;
+import org.apache.camel.spi.ReactiveExecutor;
+import org.apache.camel.support.service.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A VertX based {@link ReactiveExecutor} that uses Vert X event loop.
+ */
+public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VertXReactiveExecutor.class);
+
+    private Vertx vertx;
+
+    @Override
+    public void schedule(Runnable runnable, String description) {
+        LOG.trace("schedule: {}", runnable);
+        vertx.nettyEventLoopGroup().execute(runnable);
+    }
+
+    @Override
+    public void scheduleMain(Runnable runnable, String description) {
+        LOG.trace("scheduleMain: {}", runnable);
+        vertx.nettyEventLoopGroup().execute(runnable);
+    }
+
+    @Override
+    public void scheduleSync(Runnable runnable, String description) {
+        LOG.trace("scheduleSync: {}", runnable);
+        vertx.executeBlocking(future -> {
+            runnable.run();
+            future.complete();
+        }, res -> {});
+    }
+
+    @Override
+    public boolean executeFromQueue() {
+        LOG.trace("executeFromQueue");
+        // TODO: not implemented
+        return false;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        LOG.debug("Starting VertX");
+        vertx = Vertx.vertx();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        LOG.debug("Stopping VertX");
+        vertx.close();
+    }
+}
diff --git a/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor b/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
new file mode 100644
index 0000000..68975fc
--- /dev/null
+++ b/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.reactive.VertXReactiveExecutor
diff --git a/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
new file mode 100644
index 0000000..924952f
--- /dev/null
+++ b/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.reactive;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.reacitve.VertXReactiveExecutor;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class SimpleMockTest extends CamelTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        // TODO: should detect from classpath
+        CamelContext context = super.createCamelContext();
+        context.setReactiveExecutor(new VertXReactiveExecutor());
+        return context;
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testSimpleTwoMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:start", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("log:foo").to("log:bar").to("mock:result");
+            }
+        };
+    }
+}
diff --git a/core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties b/core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..2cebd6a
--- /dev/null
+++ b/core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
@@ -0,0 +1,31 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.out.type = File
+appender.out.name = out
+appender.out.fileName = target/camel-reactive-executor-vertx.log
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.stdout.type = Console
+appender.stdout.name = stdout
+appender.stdout.layout.type = PatternLayout
+appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+rootLogger.level = INFO
+
+rootLogger.appenderRef.out.ref = out
+#rootLogger.appenderRef.out.ref = stdout
diff --git a/core/pom.xml b/core/pom.xml
index 0792804..5da53d0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -40,6 +40,7 @@
         <module>camel-support</module>
         <module>camel-caffeine-lrucache</module>
         <module>camel-headersmap</module>
+        <module>camel-reactive-executor-vertx</module>
         <module>camel-management-api</module>
         <module>camel-management-impl</module>
         <module>camel-base</module>


[camel] 15/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cb3cebaf5916af6bedce25602c6f17fd5875f682
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 11:09:26 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../src/main/docs/reactive-executor-vertx.adoc                          | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc b/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
index e541df4..a586fe6 100644
--- a/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
+++ b/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
@@ -8,6 +8,8 @@ By default Camel uses its own reactive engine for routing messages, but you can
 different engines via a SPI interface. This is a VertX based plugin that uses the VertX event loop
 for processing message during routing.
 
+NOTE: At this time this component is an experiment so use it with care.
+
 === VertX instance
 
 This implementation will by default create a default `io.vertx.core.Vertx` instance to be used.


[camel] 12/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 611590e2c86f6a18faa81ece73c373b6b261ddab
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 10:26:28 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 apache-camel/pom.xml                                               | 6 +++++-
 apache-camel/src/main/descriptors/common-bin.xml                   | 1 +
 .../java/org/apache/camel/impl/engine/AbstractCamelContext.java    | 7 ++++++-
 core/camel-reactive-executor-vertx/pom.xml                         | 5 +++++
 .../org/apache/camel/reactive/vertx/VertXReactiveExecutor.java     | 4 ++++
 .../src/test/java/org/apache/camel/reactive/SimpleMockTest.java    | 1 -
 examples/camel-example-main/pom.xml                                | 4 ++++
 parent/pom.xml                                                     | 5 +++++
 8 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index 7de3b89..f28438e 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -100,7 +100,11 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-main</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-reactive-executor-vertx</artifactId>
+        </dependency>
+        
         <!-- NOTE: auto-generated list of components when building camel catalog -->
         <!-- camel components: START -->
     <dependency>
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index f40548a..2e74e8f 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -374,6 +374,7 @@
         <include>org.apache.camel:camel-route-parser</include>
         <include>org.apache.camel:camel-headersmap</include>
         <include>org.apache.camel:camel-caffeine-lrucache</include>
+        <include>org.apache.camel:camel-reactive-executor-vertx</include>
       </includes>
     </dependencySet>
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index d52a395..7519bcd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2581,7 +2581,12 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
                      getHeadersMapFactory());
         }
 
-        log.info("Using ReactiveExecutor: {}", getReactiveExecutor());
+        // lets log at INFO level if we are not using the default reactive executor
+        if (!getReactiveExecutor().getClass().getSimpleName().equals("DefaultReactiveExecutor")) {
+            log.info("Using ReactiveExecutor: {}", getReactiveExecutor());
+        } else {
+            log.debug("Using ReactiveExecutor: {}", getReactiveExecutor());
+        }
 
         // start routes
         if (doNotStartRoutesOnFirstStart) {
diff --git a/core/camel-reactive-executor-vertx/pom.xml b/core/camel-reactive-executor-vertx/pom.xml
index bfa6ccd..0ae3475 100644
--- a/core/camel-reactive-executor-vertx/pom.xml
+++ b/core/camel-reactive-executor-vertx/pom.xml
@@ -41,6 +41,11 @@
 
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>meta-annotations</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
         </dependency>
 
diff --git a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 298875c..47bd7b4 100644
--- a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -18,6 +18,7 @@ package org.apache.camel.reactive.vertx;
 
 import io.vertx.core.Vertx;
 import org.apache.camel.StaticService;
+import org.apache.camel.meta.Experimental;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
@@ -25,7 +26,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A VertX based {@link ReactiveExecutor} that uses Vert X event loop.
+ * <p/>
+ * NOTE: This is an experimental implementation (use with care)
  */
+@Experimental
 public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService {
 
     private static final Logger LOG = LoggerFactory.getLogger(VertXReactiveExecutor.class);
diff --git a/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
index 3383e73..0fb845f 100644
--- a/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
+++ b/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
@@ -27,7 +27,6 @@ public class SimpleMockTest extends CamelTestSupport {
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        // TODO: should detect from classpath
         CamelContext context = super.createCamelContext();
         context.setReactiveExecutor(new VertXReactiveExecutor());
         return context;
diff --git a/examples/camel-example-main/pom.xml b/examples/camel-example-main/pom.xml
index 43d1ee8..d33e285 100644
--- a/examples/camel-example-main/pom.xml
+++ b/examples/camel-example-main/pom.xml
@@ -51,6 +51,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-quartz2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-reactive-executor-vertx</artifactId>
+        </dependency>
 
         <!-- logging -->
         <dependency>
diff --git a/parent/pom.xml b/parent/pom.xml
index 87a6f35..d6a24f3 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -859,6 +859,11 @@
                 <artifactId>camel-main</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-reactive-executor-vertx</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <!-- NOTE: auto-generated list of components when building camel catalog -->
             <!-- camel components: START -->


[camel] 16/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit dee4f9bab46531d356239ecc6bbca2620bee4aca
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 11:38:42 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 apache-camel/pom.xml                               |  10 +
 apache-camel/src/main/descriptors/common-bin.xml   |   2 +
 bom/camel-bom/pom.xml                              |   5 +
 .../camel-reactive-executor-vertx/pom.xml          |   6 +-
 .../src/main/docs/reactive-executor-vertx.adoc     |   0
 .../reactive/vertx/VertXReactiveExecutor.java      |   0
 .../services/org/apache/camel/reactive-executor    |   0
 .../org/apache/camel/reactive/SimpleMockTest.java  |   0
 .../src/test/resources/log4j2.properties           |   0
 components/pom.xml                                 |   1 +
 components/readme.adoc                             |   4 +-
 core/pom.xml                                       |   1 -
 docs/components/modules/ROOT/nav.adoc              |   1 +
 .../ROOT/pages}/reactive-executor-vertx.adoc       |   0
 parent/pom.xml                                     |  10 +
 .../camel-reactive-executor-vertx-starter/pom.xml  |  53 ++++++
 .../src/main/resources/META-INF/LICENSE.txt        | 203 +++++++++++++++++++++
 .../src/main/resources/META-INF/NOTICE.txt         |  11 ++
 .../src/main/resources/META-INF/spring.provides    |  16 +-
 platforms/spring-boot/components-starter/pom.xml   |   1 +
 .../camel-spring-boot-dependencies/pom.xml         |   5 +
 21 files changed, 308 insertions(+), 21 deletions(-)

diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index f28438e..2986a98 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -1154,6 +1154,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-reactive-executor-vertx</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-reactive-streams</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -2633,6 +2638,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-reactive-executor-vertx-starter</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-reactive-streams-starter</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index 2e74e8f..6056ee7 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -250,6 +250,7 @@
         <include>org.apache.camel:camel-quartz2</include>
         <include>org.apache.camel:camel-quickfix</include>
         <include>org.apache.camel:camel-rabbitmq</include>
+        <include>org.apache.camel:camel-reactive-executor-vertx</include>
         <include>org.apache.camel:camel-reactive-streams</include>
         <include>org.apache.camel:camel-reactor</include>
         <include>org.apache.camel:camel-ref</include>
@@ -585,6 +586,7 @@
         <include>org.apache.camel:camel-quartz2-starter</include>
         <include>org.apache.camel:camel-quickfix-starter</include>
         <include>org.apache.camel:camel-rabbitmq-starter</include>
+        <include>org.apache.camel:camel-reactive-executor-vertx-starter</include>
         <include>org.apache.camel:camel-reactive-streams-starter</include>
         <include>org.apache.camel:camel-reactor-starter</include>
         <include>org.apache.camel:camel-ref-starter</include>
diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml
index a1c75f6..18f4902 100644
--- a/bom/camel-bom/pom.xml
+++ b/bom/camel-bom/pom.xml
@@ -2200,6 +2200,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-reactive-executor-vertx</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-reactive-streams</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git a/core/camel-reactive-executor-vertx/pom.xml b/components/camel-reactive-executor-vertx/pom.xml
similarity index 96%
rename from core/camel-reactive-executor-vertx/pom.xml
rename to components/camel-reactive-executor-vertx/pom.xml
index d76289a..232028d 100644
--- a/core/camel-reactive-executor-vertx/pom.xml
+++ b/components/camel-reactive-executor-vertx/pom.xml
@@ -23,12 +23,10 @@
 
     <parent>
         <groupId>org.apache.camel</groupId>
-        <artifactId>core</artifactId>
+        <artifactId>components</artifactId>
         <version>3.0.0-SNAPSHOT</version>
     </parent>
 
-    <!-- TODO: move to components -->
-
     <artifactId>camel-reactive-executor-vertx</artifactId>
     <packaging>jar</packaging>
     <name>Camel :: Reactive Executor :: Vert X</name>
@@ -36,7 +34,7 @@
 
     <properties>
         <firstVersion>3.0.0</firstVersion>
-        <label>core</label>
+        <label>reactive</label>
     </properties>
 
     <dependencies>
diff --git a/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc b/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
similarity index 100%
copy from core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
copy to components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
diff --git a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
similarity index 100%
rename from core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
rename to components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
diff --git a/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor b/components/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
similarity index 100%
rename from core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
rename to components/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
diff --git a/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
similarity index 100%
rename from core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
rename to components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
diff --git a/core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties b/components/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
similarity index 100%
copy from core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
copy to components/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
diff --git a/components/pom.xml b/components/pom.xml
index 97da906..ebf8a3d 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -265,6 +265,7 @@
         <module>camel-quartz2</module>
         <module>camel-quickfix</module>
         <module>camel-rabbitmq</module>
+        <module>camel-reactive-executor-vertx</module>
         <module>camel-reactive-streams</module>
         <module>camel-reactor</module>
         <module>camel-rest-swagger</module>
diff --git a/components/readme.adoc b/components/readme.adoc
index c3ee990..8ac82b7 100644
--- a/components/readme.adoc
+++ b/components/readme.adoc
@@ -1050,7 +1050,7 @@ Number of Languages: 17 in 11 JAR artifacts (0 deprecated)
 ==== Miscellaneous Components
 
 // others: START
-Number of Miscellaneous Components: 30 in 30 JAR artifacts (0 deprecated)
+Number of Miscellaneous Components: 31 in 31 JAR artifacts (0 deprecated)
 
 [width="100%",cols="4,1,5",options="header"]
 |===
@@ -1074,6 +1074,8 @@ Number of Miscellaneous Components: 30 in 30 JAR artifacts (0 deprecated)
 
 | link:camel-opentracing/src/main/docs/opentracing.adoc[OpenTracing] (camel-opentracing) | 2.19 | Distributed tracing using OpenTracing
 
+| link:camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc[Reactive Executor Vertx] (camel-reactive-executor-vertx) | 3.0 | Reactive Executor for camel-core using Vert X
+
 | link:camel-reactor/src/main/docs/reactor.adoc[Reactor] (camel-reactor) | 2.20 | Reactor based back-end for Camel's reactive streams component
 
 | link:camel-ribbon/src/main/docs/ribbon.adoc[Ribbon] (camel-ribbon) | 2.18 | Using Netflix Ribbon for client side load balancing
diff --git a/core/pom.xml b/core/pom.xml
index 5da53d0..0792804 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -40,7 +40,6 @@
         <module>camel-support</module>
         <module>camel-caffeine-lrucache</module>
         <module>camel-headersmap</module>
-        <module>camel-reactive-executor-vertx</module>
         <module>camel-management-api</module>
         <module>camel-management-impl</module>
         <module>camel-base</module>
diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc
index a190bdf..01f9fba 100644
--- a/docs/components/modules/ROOT/nav.adoc
+++ b/docs/components/modules/ROOT/nav.adoc
@@ -265,6 +265,7 @@
 * xref:quartz2-component.adoc[Quartz2 Component]
 * xref:quickfix-component.adoc[QuickFix Component]
 * xref:rabbitmq-component.adoc[RabbitMQ Component]
+* xref:reactive-executor-vertx.adoc[ReactiveExecutor VertX]
 * xref:reactive-streams-component.adoc[Reactive Streams Component]
 * xref:reactor-component.adoc[Reactor Component]
 * xref:ref-component.adoc[Ref Component]
diff --git a/core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc b/docs/components/modules/ROOT/pages/reactive-executor-vertx.adoc
similarity index 100%
rename from core/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
rename to docs/components/modules/ROOT/pages/reactive-executor-vertx.adoc
diff --git a/parent/pom.xml b/parent/pom.xml
index d6a24f3..ae4d237 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -1944,6 +1944,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-reactive-executor-vertx</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-reactive-streams</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -3443,6 +3448,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-reactive-executor-vertx-starter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-reactive-streams-starter</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git a/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/pom.xml b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/pom.xml
new file mode 100644
index 0000000..e4baa40
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components-starter</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>camel-reactive-executor-vertx-starter</artifactId>
+  <packaging>jar</packaging>
+  <name>Spring-Boot Starter :: Camel :: Reactive Executor :: Vert X</name>
+  <description>Spring-Boot Starter for Reactive Executor for camel-core using Vert X</description>
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter</artifactId>
+      <version>${spring-boot-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-reactive-executor-vertx</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!--START OF GENERATED CODE-->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core-starter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-spring-boot-starter</artifactId>
+    </dependency>
+    <!--END OF GENERATED CODE-->
+  </dependencies>
+</project>
diff --git a/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/LICENSE.txt b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
diff --git a/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/NOTICE.txt b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+   =========================================================================
+   ==  NOTICE file corresponding to the section 4 d of                    ==
+   ==  the Apache License, Version 2.0,                                   ==
+   ==  in this case for the Apache Camel distribution.                    ==
+   =========================================================================
+
+   This product includes software developed by
+   The Apache Software Foundation (http://www.apache.org/).
+
+   Please read the different LICENSE files present in the licenses directory of
+   this distribution.
diff --git a/core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/spring.provides
similarity index 65%
rename from core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
rename to platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/spring.provides
index 2cebd6a..8707c2a 100644
--- a/core/camel-reactive-executor-vertx/src/test/resources/log4j2.properties
+++ b/platforms/spring-boot/components-starter/camel-reactive-executor-vertx-starter/src/main/resources/META-INF/spring.provides
@@ -14,18 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-
-appender.out.type = File
-appender.out.name = out
-appender.out.fileName = target/camel-reactive-executor-vertx.log
-appender.out.layout.type = PatternLayout
-appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
-appender.stdout.type = Console
-appender.stdout.name = stdout
-appender.stdout.layout.type = PatternLayout
-appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
-
-rootLogger.level = INFO
-
-rootLogger.appenderRef.out.ref = out
-#rootLogger.appenderRef.out.ref = stdout
+provides: camel-reactive-executor-vertx
diff --git a/platforms/spring-boot/components-starter/pom.xml b/platforms/spring-boot/components-starter/pom.xml
index de0cf7c..718f6fd 100644
--- a/platforms/spring-boot/components-starter/pom.xml
+++ b/platforms/spring-boot/components-starter/pom.xml
@@ -301,6 +301,7 @@
     <module>camel-quartz2-starter</module>
     <module>camel-quickfix-starter</module>
     <module>camel-rabbitmq-starter</module>
+    <module>camel-reactive-executor-vertx-starter</module>
     <module>camel-reactive-streams-starter</module>
     <module>camel-reactor-starter</module>
     <module>camel-ref-starter</module>
diff --git a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
index 2a89a6f..8e7aa34 100644
--- a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
+++ b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
@@ -2420,6 +2420,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-reactive-executor-vertx</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-reactive-streams</artifactId>
         <version>${project.version}</version>
       </dependency>


[camel] 18/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 06beebaa59a0e1143e3fe0d49298043430a81a8b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 14:36:58 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 parent/pom.xml | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/parent/pom.xml b/parent/pom.xml
index ae4d237..1ed44d6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -859,11 +859,6 @@
                 <artifactId>camel-main</artifactId>
                 <version>${project.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.apache.camel</groupId>
-                <artifactId>camel-reactive-executor-vertx</artifactId>
-                <version>${project.version}</version>
-            </dependency>
 
             <!-- NOTE: auto-generated list of components when building camel catalog -->
             <!-- camel components: START -->


[camel] 05/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d6a47b23388dd4a40ad53e8c9703f45ab3747023
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 14:15:24 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/impl/engine/DefaultReactiveExecutor.java | 133 ++++++++++++++++++---
 .../org/apache/camel/support/ReactiveHelper.java   |   1 +
 2 files changed, 119 insertions(+), 15 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 0448037..3ce7f0a 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,55 +16,158 @@
  */
 package org.apache.camel.impl.engine;
 
+import java.util.LinkedList;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.spi.ReactiveExecutor;
-import org.apache.camel.support.ReactiveHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ReactiveExecutor}.
  */
 public class DefaultReactiveExecutor implements ReactiveExecutor {
 
-    // TODO: ReactiveHelper code should be moved here and not static
-    // ppl should use the SPI interface
+    // TODO: StaticServiceSupport so we can init/start/stop
+    // TODO: Add mbean info so we can get details
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class);
+
+    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new);
 
     @Override
     public void scheduleMain(Runnable runnable) {
-        ReactiveHelper.scheduleMain(runnable);
+        workers.get().schedule(runnable, true, true, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable) {
-        ReactiveHelper.scheduleSync(runnable);
+        workers.get().schedule(runnable, true, true, true);
     }
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
-        ReactiveHelper.scheduleMain(runnable, description);
+        workers.get().schedule(describe(runnable, description), true, true, false);
     }
 
     @Override
     public void schedule(Runnable runnable) {
-        ReactiveHelper.schedule(runnable);
+        workers.get().schedule(runnable, true, false, false);;
     }
 
     @Override
     public void schedule(Runnable runnable, String description) {
-        ReactiveHelper.schedule(runnable, description);
+        workers.get().schedule(describe(runnable, description), true, false, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
-        ReactiveHelper.scheduleSync(runnable, description);
+        workers.get().schedule(describe(runnable, description), false, true, true);
     }
 
     @Override
     public boolean executeFromQueue() {
-        return ReactiveHelper.executeFromQueue();
+        return workers.get().executeFromQueue();
     }
 
     @Override
     public void callback(AsyncCallback callback) {
-        ReactiveHelper.callback(callback);
+        schedule(new Runnable() {
+            @Override
+            public void run() {
+                callback.done(false);
+            }
+            @Override
+            public String toString() {
+                return "Callback[" + callback + "]";
+            }
+        });
     }
+
+    private static Runnable describe(Runnable runnable, String description) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                runnable.run();
+            }
+            @Override
+            public String toString() {
+                return description;
+            }
+        };
+    }
+
+    private static class Worker {
+
+        private volatile LinkedList<Runnable> queue = new LinkedList<>();
+        private volatile LinkedList<LinkedList<Runnable>> back;
+        private volatile boolean running;
+
+        public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
+            if (main) {
+                if (!queue.isEmpty()) {
+                    if (back == null) {
+                        back = new LinkedList<>();
+                    }
+                    back.push(queue);
+                    queue = new LinkedList<>();
+                }
+            }
+            if (first) {
+                queue.addFirst(runnable);
+            } else {
+                queue.addLast(runnable);
+            }
+            if (!running || sync) {
+                running = true;
+//                Thread thread = Thread.currentThread();
+//                String name = thread.getName();
+                try {
+                    for (;;) {
+                        final Runnable polled = queue.poll();
+                        if (polled == null) {
+                            if (back != null && !back.isEmpty()) {
+                                queue = back.poll();
+                                continue;
+                            } else {
+                                break;
+                            }
+                        }
+                        try {
+//                            thread.setName(name + " - " + polled.toString());
+                            polled.run();
+                        } catch (Throwable t) {
+                            LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
+                        }
+                    }
+                } finally {
+//                    thread.setName(name);
+                    running = false;
+                }
+            } else {
+                LOG.debug("Queuing reactive work: {}", runnable);
+            }
+        }
+
+        public boolean executeFromQueue() {
+            final Runnable polled = queue != null ? queue.poll() : null;
+            if (polled == null) {
+                return false;
+            }
+            Thread thread = Thread.currentThread();
+            String name = thread.getName();
+            try {
+                thread.setName(name + " - " + polled.toString());
+                polled.run();
+            } catch (Throwable t) {
+                // should not happen
+                LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
+            } finally {
+                thread.setName(name);
+            }
+            return true;
+        }
+
+    }
+
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
index 06db5e6..9541371 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A basic reactive engine that uses a worker pool to process tasks.
  */
+@Deprecated
 public final class ReactiveHelper {
 
     private static final ThreadLocal<Worker> WORKERS = ThreadLocal.withInitial(Worker::new);


[camel] 11/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 26a54578b2d6e18f3038f6c2f7e2c680063fde97
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 10:07:09 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/{reacitve => reactive/vertx}/VertXReactiveExecutor.java       | 2 +-
 .../main/resources/META-INF/services/org/apache/camel/reactive-executor | 2 +-
 .../src/test/java/org/apache/camel/reactive/SimpleMockTest.java         | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reacitve/VertXReactiveExecutor.java b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
similarity index 98%
rename from core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reacitve/VertXReactiveExecutor.java
rename to core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 5e2f507..298875c 100644
--- a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reacitve/VertXReactiveExecutor.java
+++ b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.reacitve;
+package org.apache.camel.reactive.vertx;
 
 import io.vertx.core.Vertx;
 import org.apache.camel.StaticService;
diff --git a/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor b/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
index 68975fc..554c5d6 100644
--- a/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
+++ b/core/camel-reactive-executor-vertx/src/main/resources/META-INF/services/org/apache/camel/reactive-executor
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.reactive.VertXReactiveExecutor
+class=org.apache.camel.reactive.vertx.VertXReactiveExecutor
diff --git a/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
index 924952f..3383e73 100644
--- a/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
+++ b/core/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.reactive;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.reacitve.VertXReactiveExecutor;
+import org.apache.camel.reactive.vertx.VertXReactiveExecutor;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 


[camel] 09/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c926451c5270a54772582da2ab3a7eb80cf64b5e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 09:10:56 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     | 47 ++++++++++++++++++++--
 .../impl/engine/ReactiveExecutorResolver.java      |  4 +-
 2 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 37744fb..f61b012 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -23,30 +23,69 @@ import org.apache.camel.AsyncCallback;
  */
 public interface ReactiveExecutor {
 
-    // TODO: Add javadoc
-    // TODO: Better name
-
+    /**
+     * Schedules the task to be run
+     *
+     * @param runnable    the task
+     */
     default void schedule(Runnable runnable) {
         schedule(runnable, null);
     }
 
+    /**
+     * Schedules the task to be run
+     *
+     * @param runnable    the task
+     * @param description a human readable description for logging purpose
+     */
     void schedule(Runnable runnable, String description);
 
+    /**
+     * Schedules the task to be prioritized and run asap
+     *
+     * @param runnable    the task
+     */
     default void scheduleMain(Runnable runnable) {
         scheduleMain(runnable, null);
     }
 
+    /**
+     * Schedules the task to be prioritized and run asap
+     *
+     * @param runnable    the task
+     * @param description a human readable description for logging purpose
+     */
     void scheduleMain(Runnable runnable, String description);
 
+    /**
+     * Schedules the task to run synchronously
+     *
+     * @param runnable    the task
+     */
     default void scheduleSync(Runnable runnable) {
         scheduleSync(runnable, null);
     }
 
+    /**
+     * Schedules the task to run synchronously
+     *
+     * @param runnable    the task
+     * @param description a human readable description for logging purpose
+     */
     void scheduleSync(Runnable runnable, String description);
 
-    // TODO: Can we make this so we dont need an method on this interface as its only used once
+    /**
+     * Executes the next task
+     *
+     * @return true if a task was executed or false if no more pending tasks
+     */
     boolean executeFromQueue();
 
+    /**
+     * Schedules the callback to be run
+     *
+     * @param callback    the callable
+     */
     default void callback(AsyncCallback callback) {
         schedule(new Runnable() {
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
index 3a78412..b5a2d03 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
@@ -40,14 +40,14 @@ public class ReactiveExecutorResolver {
         // use factory finder to find a custom implementations
         Class<?> type = null;
         try {
-            type = findFactory("reactive-executor-factory", context);
+            type = findFactory("reactive-executor", context);
         } catch (Exception e) {
             // ignore
         }
 
         if (type != null) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Found ReactiveExecutor: {} via: {}{}", type.getName(), factoryFinder.getResourcePath(), "reactive-executor-factory");
+                LOG.debug("Found ReactiveExecutor: {} via: {}{}", type.getName(), factoryFinder.getResourcePath(), "reactive-executor");
             }
             if (ReactiveExecutor.class.isAssignableFrom(type)) {
                 ReactiveExecutor answer = (ReactiveExecutor) context.getInjector().newInstance(type, false);


[camel] 07/18: BaseExecutorServiceManager should be abstract

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0857c76b6bc5d17b6ef80aa1be73469ffdc70228
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 14:35:58 2019 +0200

    BaseExecutorServiceManager should be abstract
---
 .../org/apache/camel/impl/engine/BaseExecutorServiceManager.java     | 5 ++---
 .../java/org/apache/camel/impl/DefaultExecutorServiceManager.java    | 3 +++
 .../org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java    | 3 ++-
 3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
index 67c4e56..5dfd1e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
@@ -52,10 +52,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Default {@link org.apache.camel.spi.ExecutorServiceManager}.
- *
+ * Base {@link org.apache.camel.spi.ExecutorServiceManager} which can be used for implementations
  */
-public class BaseExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
+public abstract class BaseExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
     private static final Logger LOG = LoggerFactory.getLogger(BaseExecutorServiceManager.class);
 
     private final CamelContext camelContext;
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
index b9eb1a1..a75bcbc 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
@@ -26,6 +26,9 @@ import org.apache.camel.model.OptionalIdentifiedDefinition;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
 
+/**
+ * Default {@link org.apache.camel.spi.ExecutorServiceManager}.
+ */
 public class DefaultExecutorServiceManager extends BaseExecutorServiceManager {
 
     public DefaultExecutorServiceManager(CamelContext camelContext) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
index f9995a0..59de11f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CustomThreadPoolFactoryTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.impl.DefaultExecutorServiceManager;
 import org.apache.camel.support.DefaultThreadPoolFactory;
 import org.junit.Test;
 
@@ -36,7 +37,7 @@ public class CustomThreadPoolFactoryTest extends ContextTestSupport {
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
-        BaseExecutorServiceManager executorServiceManager = new BaseExecutorServiceManager(context);
+        DefaultExecutorServiceManager executorServiceManager = new DefaultExecutorServiceManager(context);
         executorServiceManager.setThreadPoolFactory(factory);
         context.setExecutorServiceManager(executorServiceManager);
         return context;


[camel] 04/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f96f3ab5c66adfd6cfc94b1c8dc22b5c85aab671
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 13:57:26 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/impl/engine/AbstractCamelContext.java    |  5 +-
 .../camel/impl/engine/DefaultReactiveExecutor.java |  3 +
 .../impl/engine/ReactiveExecutorResolver.java      | 73 ++++++++++++++++++++++
 .../org/apache/camel/impl/DefaultCamelContext.java |  3 +-
 4 files changed, 81 insertions(+), 3 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 4606bc3..01f5d6e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -3256,6 +3256,8 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
     }
 
     protected void doStartEagerServices() {
+        getFactoryFinderResolver();
+        getDefaultFactoryFinder();
         getComponentResolver();
         getDataFormatResolver();
         getManagementStrategy();
@@ -3264,8 +3266,6 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getNodeIdFactory();
         getProcessorFactory();
         getMessageHistoryFactory();
-        getFactoryFinderResolver();
-        getDefaultFactoryFinder();
         getStreamCachingStrategy();
         getModelJAXBContextFactory();
         getUuidGenerator();
@@ -3274,6 +3274,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getBeanProxyFactory();
         getBeanProcessorFactory();
         getBeanPostProcessor();
+        getReactiveExecutor();
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index d700e3c..0448037 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -25,6 +25,9 @@ import org.apache.camel.support.ReactiveHelper;
  */
 public class DefaultReactiveExecutor implements ReactiveExecutor {
 
+    // TODO: ReactiveHelper code should be moved here and not static
+    // ppl should use the SPI interface
+
     @Override
     public void scheduleMain(Runnable runnable) {
         ReactiveHelper.scheduleMain(runnable);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
new file mode 100644
index 0000000..3a78412
--- /dev/null
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/ReactiveExecutorResolver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.io.IOException;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.spi.FactoryFinder;
+import org.apache.camel.spi.ReactiveExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory resolver to create the {@link org.apache.camel.spi.ReactiveExecutor} to be used.
+ */
+public class ReactiveExecutorResolver {
+
+    public static final String RESOURCE_PATH = "META-INF/services/org/apache/camel/";
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReactiveExecutorResolver.class);
+
+    private FactoryFinder factoryFinder;
+
+    public ReactiveExecutor resolve(CamelContext context) {
+        // use factory finder to find a custom implementations
+        Class<?> type = null;
+        try {
+            type = findFactory("reactive-executor-factory", context);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        if (type != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Found ReactiveExecutor: {} via: {}{}", type.getName(), factoryFinder.getResourcePath(), "reactive-executor-factory");
+            }
+            if (ReactiveExecutor.class.isAssignableFrom(type)) {
+                ReactiveExecutor answer = (ReactiveExecutor) context.getInjector().newInstance(type, false);
+                LOG.debug("Detected and using ReactiveExecutor: {}", answer);
+                return answer;
+            } else {
+                throw new IllegalArgumentException("Type is not a ReactiveExecutor implementation. Found: " + type.getName());
+            }
+        }
+
+        // fallback to default
+        LOG.debug("Creating default ReactiveExecutor");
+        return new DefaultReactiveExecutor();
+    }
+
+    private Class<?> findFactory(String name, CamelContext context) throws ClassNotFoundException, IOException {
+        if (factoryFinder == null) {
+            factoryFinder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(RESOURCE_PATH);
+        }
+        return factoryFinder.findClass(name);
+    }
+
+}
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 1cce2b8..ee0a939 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -53,6 +53,7 @@ import org.apache.camel.impl.engine.DefaultUnitOfWorkFactory;
 import org.apache.camel.impl.engine.DefaultUuidGenerator;
 import org.apache.camel.impl.engine.EndpointKey;
 import org.apache.camel.impl.engine.HeadersMapFactoryResolver;
+import org.apache.camel.impl.engine.ReactiveExecutorResolver;
 import org.apache.camel.impl.engine.RestRegistryFactoryResolver;
 import org.apache.camel.impl.engine.ServicePool;
 import org.apache.camel.impl.engine.WebSpherePackageScanClassResolver;
@@ -308,6 +309,6 @@ public class DefaultCamelContext extends AbstractModelCamelContext {
     }
 
     protected ReactiveExecutor createReactiveExecutor() {
-        return new DefaultReactiveExecutor();
+        return new ReactiveExecutorResolver().resolve(this);
     }
 }


[camel] 02/18: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e1b778d1b029316587396ae13c304c2db48f2eb7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 12:54:37 2019 +0200

    camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../main/java/org/apache/camel/CamelContext.java   |  8 +++
 .../org/apache/camel/spi/ReactiveExecutor.java     | 41 +++++++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    | 19 +++++++
 .../camel/impl/engine/DefaultReactiveExecutor.java | 61 ++++++++++++++++++++++
 .../org/apache/camel/impl/DefaultCamelContext.java |  6 +++
 5 files changed, 135 insertions(+)

diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index c225e65..325cd06 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -37,6 +37,7 @@ import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.PropertiesComponent;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RestRegistry;
@@ -1228,4 +1229,11 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
      */
     void setHeadersMapFactory(HeadersMapFactory factory);
 
+    ReactiveExecutor getReactiveExecutor();
+
+    /**
+     * Sets a custom {@link ReactiveExecutor} to be used.
+     */
+    void setReactiveExecutor(ReactiveExecutor reactiveExecutor);
+
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
new file mode 100644
index 0000000..8987bd3
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * SPI to plugin different reactive engines in the Camel routing engine.
+ */
+public interface ReactiveExecutor {
+
+    // TODO: Add javadoc
+    // TODO: Better name
+
+    void scheduleMain(Runnable runnable);
+
+    void scheduleSync(Runnable runnable);
+
+    void scheduleMain(Runnable runnable, String description);
+
+    void schedule(Runnable runnable);
+
+    void schedule(Runnable runnable, String description);
+
+    void scheduleSync(Runnable runnable, String description);
+
+    boolean executeFromQueue();
+
+}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 580818a..4606bc3 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -119,6 +119,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RestRegistry;
@@ -216,6 +217,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
 
     private final Object lock = new Object();
     private volatile CamelContextNameStrategy nameStrategy;
+    private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
     private volatile Registry registry;
     private volatile TypeConverter typeConverter;
@@ -3896,6 +3898,21 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         this.headersMapFactory = doAddService(headersMapFactory);
     }
 
+    public ReactiveExecutor getReactiveExecutor() {
+        if (reactiveExecutor == null) {
+            synchronized (lock) {
+                if (reactiveExecutor == null) {
+                    setReactiveExecutor(createReactiveExecutor());
+                }
+            }
+        }
+        return reactiveExecutor;
+    }
+
+    public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) {
+        this.reactiveExecutor = reactiveExecutor;
+    }
+
     @Override
     public DeferServiceFactory getDeferServiceFactory() {
         return deferServiceFactory;
@@ -3974,6 +3991,8 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         }
     }
 
+    protected abstract ReactiveExecutor createReactiveExecutor();
+
     protected abstract StreamCachingStrategy createStreamCachingStrategy();
 
     protected abstract TypeConverter createTypeConverter();
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
new file mode 100644
index 0000000..6a9473b
--- /dev/null
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.spi.ReactiveExecutor;
+import org.apache.camel.support.ReactiveHelper;
+
+/**
+ * Default {@link ReactiveExecutor}.
+ */
+public class DefaultReactiveExecutor implements ReactiveExecutor {
+
+    @Override
+    public void scheduleMain(Runnable runnable) {
+        ReactiveHelper.scheduleMain(runnable);
+    }
+
+    @Override
+    public void scheduleSync(Runnable runnable) {
+        ReactiveHelper.scheduleSync(runnable);
+    }
+
+    @Override
+    public void scheduleMain(Runnable runnable, String description) {
+        ReactiveHelper.scheduleMain(runnable, description);
+    }
+
+    @Override
+    public void schedule(Runnable runnable) {
+        ReactiveHelper.schedule(runnable);
+    }
+
+    @Override
+    public void schedule(Runnable runnable, String description) {
+        ReactiveHelper.schedule(runnable, description);
+    }
+
+    @Override
+    public void scheduleSync(Runnable runnable, String description) {
+        ReactiveHelper.scheduleSync(runnable, description);
+    }
+
+    @Override
+    public boolean executeFromQueue() {
+        return ReactiveHelper.executeFromQueue();
+    }
+}
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 6b7ac7e..1cce2b8 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -45,6 +45,7 @@ import org.apache.camel.impl.engine.DefaultMessageHistoryFactory;
 import org.apache.camel.impl.engine.DefaultNodeIdFactory;
 import org.apache.camel.impl.engine.DefaultPackageScanClassResolver;
 import org.apache.camel.impl.engine.DefaultProcessorFactory;
+import org.apache.camel.impl.engine.DefaultReactiveExecutor;
 import org.apache.camel.impl.engine.DefaultRouteController;
 import org.apache.camel.impl.engine.DefaultShutdownStrategy;
 import org.apache.camel.impl.engine.DefaultStreamCachingStrategy;
@@ -81,6 +82,7 @@ import org.apache.camel.spi.ModelJAXBContextFactory;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RestRegistryFactory;
 import org.apache.camel.spi.RouteController;
@@ -304,4 +306,8 @@ public class DefaultCamelContext extends AbstractModelCamelContext {
     protected StreamCachingStrategy createStreamCachingStrategy() {
         return new DefaultStreamCachingStrategy();
     }
+
+    protected ReactiveExecutor createReactiveExecutor() {
+        return new DefaultReactiveExecutor();
+    }
 }


[camel] 08/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 35805f9f612d21240e7359d5632038c55f388cff
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 15:35:37 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../camel/impl/engine/AbstractCamelContext.java    |  2 +-
 .../camel/impl/engine/DefaultReactiveExecutor.java | 47 ++++++++++++++-
 .../camel/impl/MultipleLifecycleStrategyTest.java  |  2 +-
 core/camel-management-impl/pom.xml                 | 16 +++++
 .../ManagedDefaultReactiveExecutorTest.java        | 70 ++++++++++++++++++++++
 .../management/ManagedNonManagedServiceTest.java   |  4 +-
 ...edProducerRouteAddRemoveRegisterAlwaysTest.java |  2 +-
 .../management/ManagedRouteAddRemoveTest.java      |  2 +-
 8 files changed, 137 insertions(+), 8 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 0987b86..46217c2 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -3247,6 +3247,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getProducerServicePool();
         getPollingConsumerServicePool();
         getRestRegistryFactory();
+        getReactiveExecutor();
 
         if (isTypeConverterStatisticsEnabled() != null) {
             getTypeConverterRegistry().getStatistics().setStatisticsEnabled(isTypeConverterStatisticsEnabled());
@@ -3275,7 +3276,6 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         getBeanProxyFactory();
         getBeanProcessorFactory();
         getBeanPostProcessor();
-        getReactiveExecutor();
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index e094999..350e189 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -17,9 +17,14 @@
 package org.apache.camel.impl.engine;
 
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.StaticService;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
@@ -28,13 +33,23 @@ import org.slf4j.LoggerFactory;
 /**
  * Default {@link ReactiveExecutor}.
  */
+@ManagedResource(description = "Managed ReactiveExecutor")
 public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService {
 
-    // TODO: Add mbean info so we can get details
-
     private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class);
 
-    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new);
+    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(new Supplier<Worker>() {
+        @Override
+        public Worker get() {
+            createdWorkers.incrementAndGet();
+            return new Worker(DefaultReactiveExecutor.this);
+        }
+    });
+
+    // use for statistics so we have insights at runtime
+    private final AtomicInteger createdWorkers = new AtomicInteger();
+    private final AtomicInteger runningWorkers = new AtomicInteger();
+    private final AtomicLong pendingTasks = new AtomicLong();
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
@@ -65,6 +80,21 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
         return workers.get().executeFromQueue();
     }
 
+    @ManagedAttribute(description = "Number of created workers")
+    public int getCreatedWorkers() {
+        return createdWorkers.get();
+    }
+
+    @ManagedAttribute(description = "Number of running workers")
+    public int getRunningWorkers() {
+        return runningWorkers.get();
+    }
+
+    @ManagedAttribute(description = "Number of pending tasks")
+    public long getPendingTasks() {
+        return pendingTasks.get();
+    }
+
     @Override
     public void callback(AsyncCallback callback) {
         schedule(new Runnable() {
@@ -104,10 +134,15 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
 
     private static class Worker {
 
+        private final DefaultReactiveExecutor executor;
         private volatile LinkedList<Runnable> queue = new LinkedList<>();
         private volatile LinkedList<LinkedList<Runnable>> back;
         private volatile boolean running;
 
+        public Worker(DefaultReactiveExecutor executor) {
+            this.executor = executor;
+        }
+
         void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
             if (main) {
                 if (!queue.isEmpty()) {
@@ -120,11 +155,14 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
             }
             if (first) {
                 queue.addFirst(runnable);
+                executor.pendingTasks.incrementAndGet();
             } else {
                 queue.addLast(runnable);
+                executor.pendingTasks.incrementAndGet();
             }
             if (!running || sync) {
                 running = true;
+                executor.runningWorkers.incrementAndGet();
 //                Thread thread = Thread.currentThread();
 //                String name = thread.getName();
                 try {
@@ -139,6 +177,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                             }
                         }
                         try {
+                            executor.pendingTasks.decrementAndGet();
 //                            thread.setName(name + " - " + polled.toString());
                             polled.run();
                         } catch (Throwable t) {
@@ -148,6 +187,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                 } finally {
 //                    thread.setName(name);
                     running = false;
+                    executor.runningWorkers.decrementAndGet();
                 }
             } else {
                 LOG.debug("Queuing reactive work: {}", runnable);
@@ -162,6 +202,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
             Thread thread = Thread.currentThread();
             String name = thread.getName();
             try {
+                executor.pendingTasks.decrementAndGet();
                 thread.setName(name + " - " + polled.toString());
                 polled.run();
             } catch (Throwable t) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
index caffd55..4c0ffc8 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
@@ -52,7 +52,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport {
         List<String> expectedEvents = Arrays.asList("onContextStart",
             "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
             "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
-            "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
+            "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
         
         assertEquals(expectedEvents, dummy1.getEvents());
         assertEquals(expectedEvents, dummy2.getEvents());
diff --git a/core/camel-management-impl/pom.xml b/core/camel-management-impl/pom.xml
index 3d6fef6..58aca2f 100644
--- a/core/camel-management-impl/pom.xml
+++ b/core/camel-management-impl/pom.xml
@@ -109,6 +109,22 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <!-- logging for testing -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
new file mode 100644
index 0000000..a6cb317
--- /dev/null
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class ManagedDefaultReactiveExecutorTest extends ManagementTestSupport {
+
+    @Test
+    public void testReactiveExecutor() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                    .to("log:foo")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // check mbeans
+                            MBeanServer mbeanServer = getMBeanServer();
+
+                            ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultReactiveExecutor");
+                            assertTrue("Should be registered", mbeanServer.isRegistered(on));
+
+                            // should be 1 running
+                            Integer running = (Integer) mbeanServer.getAttribute(on, "RunningWorkers");
+                            assertEquals(1, running.intValue());
+
+                            // should be 0 pending
+                            Long pending = (Long) mbeanServer.getAttribute(on, "PendingTasks");
+                            assertEquals(0, pending.intValue());
+                        }
+                    })
+                    .to("log:bar")
+                    .to("mock:result");
+            }
+        };
+    }
+
+
+}
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
index 6962165..3a249b5 100644
--- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 
 public class ManagedNonManagedServiceTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Test
     public void testService() throws Exception {
@@ -38,6 +38,8 @@ public class ManagedNonManagedServiceTest extends ManagementTestSupport {
             return;
         }
 
+        template.sendBody("direct:start", "Hello World");
+
         // must enable always as CamelContext has been started
         // and we add the service manually below
         context.getManagementStrategy().getManagementAgent().setRegisterAlways(true);
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index 6c7cd92..4bb974c 100644
--- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementTestSupport {
 
-    private int services = 11;
+    private int services = 12;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index fff6e2d..339ec5f 100644
--- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -33,7 +33,7 @@ import org.junit.Test;
  */
 public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
     
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {


[camel] 17/18: CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0a181b3380aed1d97aa1a5b73c3bb954b5d036bd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jun 13 14:02:09 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../core/xml/AbstractCamelContextFactoryBean.java  |  6 +++
 .../camel/main/DefaultConfigurationConfigurer.java | 44 ++++++++++++++--------
 2 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index bde2086..50b0adc 100644
--- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -102,6 +102,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanFilter;
 import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RouteController;
 import org.apache.camel.spi.RoutePolicyFactory;
@@ -1166,5 +1167,10 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
             LOG.info("Using custom MessageHistoryFactory: {}", messageHistoryFactory);
             getContext().setMessageHistoryFactory(messageHistoryFactory);
         }
+        ReactiveExecutor reactiveExecutor = getBeanForType(ReactiveExecutor.class);
+        if (reactiveExecutor != null) {
+            // already logged in CamelContext
+            getContext().setReactiveExecutor(reactiveExecutor);
+        }
     }
 }
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 3f23d2d..9568594 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -35,22 +35,31 @@ import org.apache.camel.model.Model;
 import org.apache.camel.processor.interceptor.BacklogTracer;
 import org.apache.camel.processor.interceptor.HandleFault;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.ClassResolver;
+import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EventFactory;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.LogListener;
 import org.apache.camel.spi.ManagementObjectNameStrategy;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.ModelJAXBContextFactory;
+import org.apache.camel.spi.NodeIdFactory;
+import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RouteController;
 import org.apache.camel.spi.RoutePolicyFactory;
 import org.apache.camel.spi.RuntimeEndpointRegistry;
 import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.spi.UnitOfWorkFactory;
 import org.apache.camel.spi.UuidGenerator;
@@ -173,24 +182,34 @@ public final class DefaultConfigurationConfigurer {
         registerPropertyForBeanType(registry, EventFactory.class, managementStrategy::setEventFactory);
         registerPropertyForBeanType(registry, UnitOfWorkFactory.class, camelContext.adapt(ExtendedCamelContext.class)::setUnitOfWorkFactory);
         registerPropertyForBeanType(registry, RuntimeEndpointRegistry.class, camelContext::setRuntimeEndpointRegistry);
+        registerPropertyForBeanType(registry, ModelJAXBContextFactory.class, camelContext.adapt(ExtendedCamelContext.class)::setModelJAXBContextFactory);
+        registerPropertyForBeanType(registry, ClassResolver.class, camelContext::setClassResolver);
+        registerPropertyForBeanType(registry, FactoryFinderResolver.class, camelContext.adapt(ExtendedCamelContext.class)::setFactoryFinderResolver);
+        registerPropertyForBeanType(registry, RouteController.class, camelContext::setRouteController);
+        registerPropertyForBeanType(registry, UuidGenerator.class, camelContext::setUuidGenerator);
+        registerPropertyForBeanType(registry, ExecutorServiceManager.class, camelContext::setExecutorServiceManager);
+        registerPropertyForBeanType(registry, ThreadPoolFactory.class, camelContext.getExecutorServiceManager()::setThreadPoolFactory);
+        registerPropertyForBeanType(registry, ProcessorFactory.class, camelContext.adapt(ExtendedCamelContext.class)::setProcessorFactory);
+        registerPropertyForBeanType(registry, Debugger.class, camelContext::setDebugger);
+        registerPropertyForBeanType(registry, NodeIdFactory.class, camelContext.adapt(ExtendedCamelContext.class)::setNodeIdFactory);
+        registerPropertyForBeanType(registry, MessageHistoryFactory.class, camelContext::setMessageHistoryFactory);
+        registerPropertyForBeanType(registry, ReactiveExecutor.class, camelContext::setReactiveExecutor);
+        registerPropertyForBeanType(registry, ShutdownStrategy.class, camelContext::setShutdownStrategy);
 
         registerPropertiesForBeanTypes(registry, TypeConverters.class, camelContext.getTypeConverterRegistry()::addTypeConverters);
+        registerPropertiesForBeanTypes(registry, EndpointStrategy.class, camelContext.adapt(ExtendedCamelContext.class)::registerEndpointCallback);
+        registerPropertiesForBeanTypes(registry, CamelClusterService.class, addServiceToContext(camelContext));
+        registerPropertiesForBeanTypes(registry, RoutePolicyFactory.class, camelContext::addRoutePolicyFactory);
 
         final Predicate<EventNotifier> containsEventNotifier = managementStrategy.getEventNotifiers()::contains;
         registerPropertiesForBeanTypesWithCondition(registry, EventNotifier.class, containsEventNotifier.negate(), managementStrategy::addEventNotifier);
 
-        registerPropertiesForBeanTypes(registry, EndpointStrategy.class, camelContext.adapt(ExtendedCamelContext.class)::registerEndpointCallback);
-
-        registerPropertyForBeanType(registry, ShutdownStrategy.class, camelContext::setShutdownStrategy);
-
         final Predicate<InterceptStrategy> containsInterceptStrategy = camelContext.adapt(ExtendedCamelContext.class).getInterceptStrategies()::contains;
         registerPropertiesForBeanTypesWithCondition(registry, InterceptStrategy.class, containsInterceptStrategy.negate(), camelContext.adapt(ExtendedCamelContext.class)::addInterceptStrategy);
 
         final Predicate<LifecycleStrategy> containsLifecycleStrategy = camelContext.getLifecycleStrategies()::contains;
         registerPropertiesForBeanTypesWithCondition(registry, LifecycleStrategy.class, containsLifecycleStrategy.negate(), camelContext::addLifecycleStrategy);
 
-        registerPropertiesForBeanTypes(registry, CamelClusterService.class, addServiceToContext(camelContext));
-
         // service registry
         Map<String, ServiceRegistry> serviceRegistries = registry.findByTypeWithName(ServiceRegistry.class);
         if (serviceRegistries != null && !serviceRegistries.isEmpty()) {
@@ -206,14 +225,13 @@ public final class DefaultConfigurationConfigurer {
             }
         }
 
-        registerPropertiesForBeanTypes(registry, RoutePolicyFactory.class, camelContext::addRoutePolicyFactory);
-
-        // add SSL context parameters
+        // SSL context parameters
         GlobalSSLContextParametersSupplier sslContextParametersSupplier = getSingleBeanOfType(registry, GlobalSSLContextParametersSupplier.class);
         if (sslContextParametersSupplier != null) {
             camelContext.setSSLContextParameters(sslContextParametersSupplier.get());
         }
-        // Health check registry
+
+        // health check
         HealthCheckRegistry healthCheckRegistry = getSingleBeanOfType(registry, HealthCheckRegistry.class);
         if (healthCheckRegistry != null) {
             healthCheckRegistry.setCamelContext(camelContext);
@@ -223,18 +241,12 @@ public final class DefaultConfigurationConfigurer {
             healthCheckRegistry = HealthCheckRegistry.get(camelContext);
             healthCheckRegistry.setCamelContext(camelContext);
         }
-
         registerPropertiesForBeanTypes(registry, HealthCheckRepository.class, healthCheckRegistry::addRepository);
-
         registerPropertyForBeanType(registry, HealthCheckService.class, addServiceToContext(camelContext));
-        registerPropertyForBeanType(registry, RouteController.class, camelContext::setRouteController);
-        registerPropertyForBeanType(registry, UuidGenerator.class, camelContext::setUuidGenerator);
 
         final Predicate<LogListener> containsLogListener = camelContext.adapt(ExtendedCamelContext.class).getLogListeners()::contains;
         registerPropertiesForBeanTypesWithCondition(registry, LogListener.class, containsLogListener.negate(), camelContext.adapt(ExtendedCamelContext.class)::addLogListener);
 
-        registerPropertyForBeanType(registry, ExecutorServiceManager.class, camelContext::setExecutorServiceManager);
-
         // set the default thread pool profile if defined
         initThreadPoolProfiles(registry, camelContext);
     }