You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/03/02 08:15:40 UTC

[camel] branch master updated: CAMEL-14568 Fixed reactive streams

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new a69abb0  CAMEL-14568 Fixed reactive streams
a69abb0 is described below

commit a69abb0422cba1ba254ab2faa54e10076e06bda7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 2 09:05:38 2020 +0100

    CAMEL-14568 Fixed reactive streams
---
 .../ReactiveStreamsComponentConfigurer.java        | 10 +++-
 .../reactive/streams/reactive-streams.json         |  5 +-
 .../src/main/docs/reactive-streams-component.adoc  |  7 ++-
 .../reactive/streams/ReactiveStreamsComponent.java | 62 +++++++++++++++++++---
 .../engine/ReactiveStreamsEngineConfiguration.java |  7 +--
 .../ReactiveStreamsComponentBuilderFactory.java    | 54 +++++++++++++++++--
 6 files changed, 121 insertions(+), 24 deletions(-)

diff --git a/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java b/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java
index 45af20a..842a9ae 100644
--- a/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java
+++ b/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java
@@ -21,12 +21,18 @@ public class ReactiveStreamsComponentConfigurer extends PropertyConfigurerSuppor
         case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
-        case "internalengineconfiguration":
-        case "internalEngineConfiguration": target.setInternalEngineConfiguration(property(camelContext, org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
+        case "reactivestreamsengineconfiguration":
+        case "reactiveStreamsEngineConfiguration": target.setReactiveStreamsEngineConfiguration(property(camelContext, org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration.class, value)); return true;
         case "servicetype":
         case "serviceType": target.setServiceType(property(camelContext, java.lang.String.class, value)); return true;
+        case "threadpoolmaxsize":
+        case "threadPoolMaxSize": target.setThreadPoolMaxSize(property(camelContext, int.class, value)); return true;
+        case "threadpoolminsize":
+        case "threadPoolMinSize": target.setThreadPoolMinSize(property(camelContext, int.class, value)); return true;
+        case "threadpoolname":
+        case "threadPoolName": target.setThreadPoolName(property(camelContext, java.lang.String.class, value)); return true;
         default: return false;
         }
     }
diff --git a/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json b/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json
index 24bc99d..9bda3be 100644
--- a/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json
+++ b/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json
@@ -19,11 +19,14 @@
     "version": "3.2.0-SNAPSHOT"
   },
   "componentProperties": {
+    "threadPoolMaxSize": { "kind": "property", "displayName": "Thread Pool Max Size", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "10", "description": "The maximum number of threads used by the reactive streams internal engine." },
+    "threadPoolMinSize": { "kind": "property", "displayName": "Thread Pool Min Size", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The minimum number of threads used by the reactive streams internal engine." },
+    "threadPoolName": { "kind": "property", "displayName": "Thread Pool Name", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "CamelReactiveStreamsWorker", "description": "The name of the thread pool used by the reactive streams internal engine." },
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by [...]
     "backpressureStrategy": { "kind": "property", "displayName": "Backpressure Strategy", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy", "enum": [ "BUFFER", "OLDEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "BUFFER", "description": "The backpressure strategy to use when pushing events to a slow subscriber." },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the r [...]
     "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
-    "internalEngineConfiguration": { "kind": "property", "displayName": "Internal Engine Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration", "deprecated": false, "secret": false, "description": "Configures the internal engine for Reactive Streams." },
+    "reactiveStreamsEngineConfiguration": { "kind": "property", "displayName": "Reactive Streams Engine Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration", "deprecated": false, "secret": false, "description": "To use an existing reactive stream engine configuration." },
     "serviceType": { "kind": "property", "displayName": "Service Type", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Set the type of the underlying reactive streams implementation to use. The implementation is looked up from the registry or using a ServiceLoader, the default implementation is DefaultCamelReactiveStreamsService" }
   },
   "properties": {
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 1238d82..eb337b6 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -49,18 +49,21 @@ external stream processing systems.
 
 
 // component options: START
-The Reactive Streams component supports 6 options, which are listed below.
+The Reactive Streams component supports 9 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *threadPoolMaxSize* (common) | The maximum number of threads used by the reactive streams internal engine. | 10 | int
+| *threadPoolMinSize* (common) | The minimum number of threads used by the reactive streams internal engine. |  | int
+| *threadPoolName* (common) | The name of the thread pool used by the reactive streams internal engine. | CamelReactiveStreamsWorker | String
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *backpressureStrategy* (producer) | The backpressure strategy to use when pushing events to a slow subscriber. The value can be one of: BUFFER, OLDEST, LATEST | BUFFER | ReactiveStreamsBackpressureStrategy
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
-| *internalEngineConfiguration* (advanced) | Configures the internal engine for Reactive Streams. |  | ReactiveStreamsEngineConfiguration
+| *reactiveStreamsEngine Configuration* (advanced) | To use an existing reactive stream engine configuration. |  | ReactiveStreamsEngineConfiguration
 | *serviceType* (advanced) | Set the type of the underlying reactive streams implementation to use. The implementation is looked up from the registry or using a ServiceLoader, the default implementation is DefaultCamelReactiveStreamsService |  | String
 |===
 // component options: END
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
index 417c4d4..0104033 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
@@ -32,12 +32,18 @@ import org.apache.camel.support.service.ServiceHelper;
  */
 @Component("reactive-streams")
 public class ReactiveStreamsComponent extends DefaultComponent {
-    @Metadata(label = "advanced")
-    private ReactiveStreamsEngineConfiguration internalEngineConfiguration = new ReactiveStreamsEngineConfiguration();
+    @Metadata(label = "common", defaultValue = "CamelReactiveStreamsWorker")
+    private String threadPoolName = "CamelReactiveStreamsWorker";
+    @Metadata(label = "common")
+    private int threadPoolMinSize;
+    @Metadata(label = "common", defaultValue = "10")
+    private int threadPoolMaxSize = 10;
     @Metadata(label = "producer", defaultValue = "BUFFER")
     private ReactiveStreamsBackpressureStrategy backpressureStrategy = ReactiveStreamsBackpressureStrategy.BUFFER;
     @Metadata(label = "advanced")
     private String serviceType;
+    @Metadata(label = "advanced")
+    private ReactiveStreamsEngineConfiguration reactiveStreamsEngineConfiguration;
 
     private CamelReactiveStreamsService service;
 
@@ -81,15 +87,15 @@ public class ReactiveStreamsComponent extends DefaultComponent {
     // Properties
     // ****************************************
 
-    public ReactiveStreamsEngineConfiguration getInternalEngineConfiguration() {
-        return internalEngineConfiguration;
+    public ReactiveStreamsEngineConfiguration getReactiveStreamsEngineConfiguration() {
+        return reactiveStreamsEngineConfiguration;
     }
 
     /**
-     * Configures the internal engine for Reactive Streams.
+     * To use an existing reactive stream engine configuration.
      */
-    public void setInternalEngineConfiguration(ReactiveStreamsEngineConfiguration internalEngineConfiguration) {
-        this.internalEngineConfiguration = internalEngineConfiguration;
+    public void setReactiveStreamsEngineConfiguration(ReactiveStreamsEngineConfiguration reactiveStreamsEngineConfiguration) {
+        this.reactiveStreamsEngineConfiguration = reactiveStreamsEngineConfiguration;
     }
 
     public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
@@ -118,17 +124,57 @@ public class ReactiveStreamsComponent extends DefaultComponent {
         this.serviceType = serviceType;
     }
 
+    public String getThreadPoolName() {
+        return threadPoolName;
+    }
+
+    /**
+     * The name of the thread pool used by the reactive streams internal engine.
+     */
+    public void setThreadPoolName(String threadPoolName) {
+        this.threadPoolName = threadPoolName;
+    }
+
+    public int getThreadPoolMinSize() {
+        return threadPoolMinSize;
+    }
+
+    /**
+     * The minimum number of threads used by the reactive streams internal engine.
+     */
+    public void setThreadPoolMinSize(int threadPoolMinSize) {
+        this.threadPoolMinSize = threadPoolMinSize;
+    }
+
+    public int getThreadPoolMaxSize() {
+        return threadPoolMaxSize;
+    }
+
+    /**
+     * The maximum number of threads used by the reactive streams internal engine.
+     */
+    public void setThreadPoolMaxSize(int threadPoolMaxSize) {
+        this.threadPoolMaxSize = threadPoolMaxSize;
+    }
+
     /**
      * Lazy creation of the CamelReactiveStreamsService
      *
      * @return the reactive streams service
      */
     public synchronized CamelReactiveStreamsService getReactiveStreamsService() {
+        if (reactiveStreamsEngineConfiguration == null) {
+            reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration();
+            reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
+            reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
+            reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
+        }
+
         if (service == null) {
             this.service = ReactiveStreamsHelper.resolveReactiveStreamsService(
                 getCamelContext(),
                 this.serviceType,
-                this.internalEngineConfiguration
+                this.reactiveStreamsEngineConfiguration
             );
 
             try {
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java
index 0418f10..53af7c5 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java
@@ -16,18 +16,13 @@
  */
 package org.apache.camel.component.reactive.streams.engine;
 
-import org.apache.camel.spi.Metadata;
-
 /**
  * Configuration parameters for the Camel internal reactive-streams engine.
  */
-public class ReactiveStreamsEngineConfiguration implements Cloneable {
+public class ReactiveStreamsEngineConfiguration {
 
-    @Metadata(defaultValue = "CamelReactiveStreamsWorker", description = "The name of the thread pool used by the reactive streams internal engine.")
     private String threadPoolName = "CamelReactiveStreamsWorker";
-    @Metadata(description = "The minimum number of threads used by the reactive streams internal engine.")
     private int threadPoolMinSize;
-    @Metadata(defaultValue = "10", description = "The maximum number of threads used by the reactive streams internal engine.")
     private int threadPoolMaxSize = 10;
 
     public ReactiveStreamsEngineConfiguration() {
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java
index f3cfae5..7f27341 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java
@@ -49,6 +49,47 @@ public interface ReactiveStreamsComponentBuilderFactory {
             extends
                 ComponentBuilder<ReactiveStreamsComponent> {
         /**
+         * The maximum number of threads used by the reactive streams internal
+         * engine.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: common
+         */
+        default ReactiveStreamsComponentBuilder threadPoolMaxSize(
+                int threadPoolMaxSize) {
+            doSetProperty("threadPoolMaxSize", threadPoolMaxSize);
+            return this;
+        }
+        /**
+         * The minimum number of threads used by the reactive streams internal
+         * engine.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Group: common
+         */
+        default ReactiveStreamsComponentBuilder threadPoolMinSize(
+                int threadPoolMinSize) {
+            doSetProperty("threadPoolMinSize", threadPoolMinSize);
+            return this;
+        }
+        /**
+         * The name of the thread pool used by the reactive streams internal
+         * engine.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Default: CamelReactiveStreamsWorker
+         * Group: common
+         */
+        default ReactiveStreamsComponentBuilder threadPoolName(
+                java.lang.String threadPoolName) {
+            doSetProperty("threadPoolName", threadPoolName);
+            return this;
+        }
+        /**
          * Allows for bridging the consumer to the Camel routing Error Handler,
          * which mean any exceptions occurred while the consumer is trying to
          * pickup incoming messages, or the likes, will now be processed as a
@@ -118,16 +159,16 @@ public interface ReactiveStreamsComponentBuilderFactory {
             return this;
         }
         /**
-         * Configures the internal engine for Reactive Streams.
+         * To use an existing reactive stream engine configuration.
          * 
          * The option is a:
          * <code>org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration</code> type.
          * 
          * Group: advanced
          */
-        default ReactiveStreamsComponentBuilder internalEngineConfiguration(
-                org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration internalEngineConfiguration) {
-            doSetProperty("internalEngineConfiguration", internalEngineConfiguration);
+        default ReactiveStreamsComponentBuilder reactiveStreamsEngineConfiguration(
+                org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration reactiveStreamsEngineConfiguration) {
+            doSetProperty("reactiveStreamsEngineConfiguration", reactiveStreamsEngineConfiguration);
             return this;
         }
         /**
@@ -162,11 +203,14 @@ public interface ReactiveStreamsComponentBuilderFactory {
                 String name,
                 Object value) {
             switch (name) {
+            case "threadPoolMaxSize": ((ReactiveStreamsComponent) component).setThreadPoolMaxSize((int) value); return true;
+            case "threadPoolMinSize": ((ReactiveStreamsComponent) component).setThreadPoolMinSize((int) value); return true;
+            case "threadPoolName": ((ReactiveStreamsComponent) component).setThreadPoolName((java.lang.String) value); return true;
             case "bridgeErrorHandler": ((ReactiveStreamsComponent) component).setBridgeErrorHandler((boolean) value); return true;
             case "backpressureStrategy": ((ReactiveStreamsComponent) component).setBackpressureStrategy((org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy) value); return true;
             case "lazyStartProducer": ((ReactiveStreamsComponent) component).setLazyStartProducer((boolean) value); return true;
             case "basicPropertyBinding": ((ReactiveStreamsComponent) component).setBasicPropertyBinding((boolean) value); return true;
-            case "internalEngineConfiguration": ((ReactiveStreamsComponent) component).setInternalEngineConfiguration((org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration) value); return true;
+            case "reactiveStreamsEngineConfiguration": ((ReactiveStreamsComponent) component).setReactiveStreamsEngineConfiguration((org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration) value); return true;
             case "serviceType": ((ReactiveStreamsComponent) component).setServiceType((java.lang.String) value); return true;
             default: return false;
             }