You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 09:20:19 UTC

[camel-k-runtime] 02/03: Make kamelet:sink and kamelet:source work with workaround here (need to find nicer solution in camel-core)

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit cb2b8934fbb7226e40c2ae965c84d9b768edc178
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Sep 28 18:17:18 2020 +0200

    Make kamelet:sink and kamelet:source work with workaround here (need to find nicer solution in camel-core)
---
 .../kamelet/KameletComponentConfigurer.java        |   6 ++
 .../kamelet/KameletEndpointConfigurer.java         |  11 +++
 .../apache/camel/component/kamelet/kamelet.json    |   5 +
 .../camel/component/kamelet/KameletComponent.java  | 101 ++++++++++++++++++++-
 .../camel/component/kamelet/KameletEndpoint.java   |   3 +
 5 files changed, 124 insertions(+), 2 deletions(-)

diff --git a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java
index a0a384d..7c3717a 100644
--- a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java
+++ b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java
@@ -21,10 +21,12 @@ public class KameletComponentConfigurer extends PropertyConfigurerSupport implem
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "block": target.setBlock(property(camelContext, boolean.class, value)); return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
+        case "timeout": target.setTimeout(property(camelContext, long.class, value)); return true;
         default: return false;
         }
     }
@@ -33,8 +35,10 @@ public class KameletComponentConfigurer extends PropertyConfigurerSupport implem
     public Map<String, Object> getAllOptions(Object target) {
         Map<String, Object> answer = new CaseInsensitiveMap();
         answer.put("basicPropertyBinding", boolean.class);
+        answer.put("block", boolean.class);
         answer.put("bridgeErrorHandler", boolean.class);
         answer.put("lazyStartProducer", boolean.class);
+        answer.put("timeout", long.class);
         return answer;
     }
 
@@ -44,10 +48,12 @@ public class KameletComponentConfigurer extends PropertyConfigurerSupport implem
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": return target.isBasicPropertyBinding();
+        case "block": return target.isBlock();
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
+        case "timeout": return target.getTimeout();
         default: return null;
         }
     }
diff --git a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
index bb99f63..6869355 100644
--- a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
+++ b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
@@ -21,15 +21,19 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "block": target.setBlock(property(camelContext, boolean.class, value)); return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
         case "exceptionhandler":
         case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
         case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
+        case "kameletproperties":
+        case "kameletProperties": target.setKameletProperties(property(camelContext, java.util.Map.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
         case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
+        case "timeout": target.setTimeout(property(camelContext, long.class, value)); return true;
         default: return false;
         }
     }
@@ -38,11 +42,14 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
     public Map<String, Object> getAllOptions(Object target) {
         Map<String, Object> answer = new CaseInsensitiveMap();
         answer.put("basicPropertyBinding", boolean.class);
+        answer.put("block", boolean.class);
         answer.put("bridgeErrorHandler", boolean.class);
         answer.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class);
         answer.put("exchangePattern", org.apache.camel.ExchangePattern.class);
+        answer.put("kameletProperties", java.util.Map.class);
         answer.put("lazyStartProducer", boolean.class);
         answer.put("synchronous", boolean.class);
+        answer.put("timeout", long.class);
         return answer;
     }
 
@@ -52,15 +59,19 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": return target.isBasicPropertyBinding();
+        case "block": return target.isBlock();
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "exceptionhandler":
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
         case "exchangePattern": return target.getExchangePattern();
+        case "kameletproperties":
+        case "kameletProperties": return target.getKameletProperties();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
         case "synchronous": return target.isSynchronous();
+        case "timeout": return target.getTimeout();
         default: return null;
         }
     }
diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
index 2da7754..a66f1ca 100644
--- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
+++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
@@ -22,7 +22,9 @@
   },
   "componentProperties": {
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by [...]
+    "block": { "kind": "property", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "description": "If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active." },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the r [...]
+    "timeout": { "kind": "property", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "30000", "description": "The timeout value to use if block is enabled." },
     "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" }
   },
   "properties": {
@@ -31,7 +33,10 @@
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "block": { "kind": "parameter", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "description": "If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active." },
+    "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Custom properties for kamelet" },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the  [...]
+    "timeout": { "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "30000", "description": "The timeout value to use if block is enabled." },
     "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
     "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }
   }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 91a2514..1cc92b2 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -17,8 +17,12 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -26,9 +30,16 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.VetoCamelContextStartException;
+import org.apache.camel.model.EndpointRequiredDefinition;
+import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.ProcessorDefinitionHelper;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
+import org.apache.camel.model.RouteTemplateParameterDefinition;
+import org.apache.camel.model.ToDefinition;
 import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.LifecycleStrategySupport;
@@ -36,6 +47,9 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+
 /**
  * The Kamelet Component provides support for materializing routes templates.
  */
@@ -57,6 +71,11 @@ public class KameletComponent extends DefaultComponent {
     }
 
     @Override
+    public Endpoint createEndpoint(String uri) throws Exception {
+        return super.createEndpoint(uri);
+    }
+
+    @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
         final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
@@ -209,14 +228,92 @@ public class KameletComponent extends DefaultComponent {
             LOGGER.debug("Creating route from template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
 
             final ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
-            final String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+            final String id = addRouteFromTemplate(context, endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
             final RouteDefinition def = context.getRouteDefinition(id);
 
             if (!def.isPrepared()) {
-                context.startRouteDefinitions(List.of(def));
+                // when starting the route that was created from the template
+                // then we must provide the route id as local properties to the properties component
+                // as this route id is used internal by kamelets when they are starting
+                PropertiesComponent pc = context.getPropertiesComponent();
+                try {
+                    Properties prop = new Properties();
+                    prop.put("routeId", id);
+                    pc.setLocalProperties(prop);
+                    context.startRouteDefinitions(List.of(def));
+                } finally {
+                    pc.setLocalProperties(null);
+                }
             }
 
             LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
         }
+
+        private static String addRouteFromTemplate(final ModelCamelContext context, final String routeId, final String routeTemplateId, final Map<String, Object> parameters)
+                throws Exception {
+            RouteTemplateDefinition target = null;
+            for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
+                if (routeTemplateId.equals(def.getId())) {
+                    target = def;
+                    break;
+                }
+            }
+            if (target == null) {
+                throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
+            }
+
+            StringJoiner templatesBuilder = new StringJoiner(", ");
+            final Map<String, Object> prop = new HashMap();
+            // include default values first from the template (and validate that we have inputs for all required parameters)
+            if (target.getTemplateParameters() != null) {
+                for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
+                    if (temp.getDefaultValue() != null) {
+                        prop.put(temp.getName(), temp.getDefaultValue());
+                    } else {
+                        // this is a required parameter do we have that as input
+                        if (!parameters.containsKey(temp.getName())) {
+                            templatesBuilder.add(temp.getName());
+                        }
+                    }
+                }
+            }
+            if (templatesBuilder.length() > 0) {
+                throw new IllegalArgumentException(
+                        "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
+                                + templatesBuilder.toString());
+            }
+            // then override with user parameters
+            if (parameters != null) {
+                prop.putAll(parameters);
+            }
+
+            RouteDefinition def = target.asRouteDefinition();
+            // must make deep copy of input
+            def.setInput(null);
+            def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
+            if (routeId != null) {
+                def.setId(routeId);
+            }
+            // must make the source and simk endpoints are unique by appending the route id before we create the route from the template
+            if (def.getInput().getEndpointUri().startsWith("kamelet:source")) {
+                def.getInput().setUri("kamelet:source?routeId=" + routeId);
+            }
+            Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
+            while (it.hasNext()) {
+                ToDefinition to = it.next();
+                if (to.getEndpointUri().startsWith("kamelet:sink")) {
+                    // TODO: must make deep copy
+                    to.setUri("kamelet:sink?routeId=" + routeId);
+                }
+            }
+
+
+            def.setTemplateParameters(prop);
+            context.removeRouteDefinition(def);
+            context.getRouteDefinitions().add(def);
+
+            return def.getId();
+        }
+
     }
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 80d8e10..2d6e883 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -127,6 +127,9 @@ public class KameletEndpoint extends DefaultEndpoint {
         return routeId;
     }
 
+    /**
+     * Custom properties for kamelet
+     */
     public void setKameletProperties(Map<String, Object> kameletProperties) {
         if (kameletProperties != null) {
             this.kameletProperties.clear();