You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 09:20:20 UTC

[camel-k-runtime] 03/03: kamelet source/sink component #490

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 979288ecc735e8613a39ca5d627d2f288fb68c2f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Mon Sep 28 20:59:58 2020 +0200

    kamelet source/sink component #490
---
 .../src/main/resources/application.properties      |   2 +-
 .../src/test/resources/routes/set-body.yaml        |   2 +-
 .../src/test/resources/routes/to-upper.yaml        |   2 +-
 .../apache/camel/component/kamelet/kamelet.json    |   2 +-
 .../apache/camel/component/kamelet/Kamelet.java    |  97 ++++++++++++-
 .../camel/component/kamelet/KameletComponent.java  | 154 +++++++--------------
 .../camel/component/kamelet/KameletEndpoint.java   |   7 +-
 .../camel/component/kamelet/KameletProducer.java   |   2 +-
 .../src/test/resources/log4j2-test.xml             |   2 +
 9 files changed, 151 insertions(+), 119 deletions(-)

diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
index 3ce5493..5b8f41a 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
@@ -18,6 +18,6 @@
 #
 # Quarkus
 #
-quarkus.log.console.enable = false
+quarkus.log.console.enable = true
 quarkus.banner.enabled     = false
 
diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
index 55347e9..c311da1 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
@@ -16,7 +16,7 @@
 #
 
 - from:
-    uri: "direct:{{routeId}}"
+    uri: "kamelet:source"
     steps:
       - set-body:
           constant: "{{bodyValue}}"
\ No newline at end of file
diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
index 74105f1..ba51838 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
@@ -16,7 +16,7 @@
 #
 
 - from:
-    uri: "direct:{{routeId}}"
+    uri: "kamelet:source"
     steps:
       - set-body:
           constant: "{{message}}"
diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
index a66f1ca..4dd132f 100644
--- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
+++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
@@ -6,7 +6,7 @@
     "description": "The Kamelet Component provides support for interacting with Knative",
     "deprecated": false,
     "firstVersion": "3.5.0",
-    "label": "camel-k",
+    "label": "core",
     "javaType": "org.apache.camel.component.kamelet.KameletComponent",
     "supportLevel": "Preview",
     "groupId": "org.apache.camel.k",
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index 689c1ed..14ecc43 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -17,19 +17,32 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.function.Predicate;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.model.FromDefinition;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
+import org.apache.camel.model.RouteTemplateParameterDefinition;
+import org.apache.camel.model.ToDefinition;
 import org.apache.camel.spi.PropertiesComponent;
+import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.util.StringHelper;
 
+import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+
 public final class Kamelet {
     public static final String PROPERTIES_PREFIX = "camel.kamelet.";
     public static final String SCHEME = "kamelet";
     public static final String SOURCE_ID = "source";
     public static final String SINK_ID = "sink";
+    public static final String PARAM_ROUTE_ID = "routeId";
+    public static final String PARAM_TEMPLATE_ID = "templateId";
 
     private Kamelet() {
     }
@@ -38,9 +51,14 @@ public final class Kamelet {
         return item -> item.startsWith(prefix);
     }
 
-    public static String extractTemplateId(CamelContext context, String remaining) {
+    public static String extractTemplateId(CamelContext context, String remaining, Map<String, Object> parameters) {
+        Object param = parameters.get(PARAM_TEMPLATE_ID);
+        if (param != null) {
+            return CamelContextHelper.mandatoryConvertTo(context, String.class, param);
+        }
+
         if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
-            return context.resolvePropertyPlaceholders("{{templateId}}");
+            return context.resolvePropertyPlaceholders("{{" + PARAM_TEMPLATE_ID + "}}");
         }
 
         String answer = StringHelper.before(remaining, "/");
@@ -51,14 +69,19 @@ public final class Kamelet {
         return answer;
     }
 
-    public static String extractRouteId(CamelContext context, String remaining) {
+    public static String extractRouteId(CamelContext context, String remaining, Map<String, Object> parameters) {
+        Object param = parameters.get(PARAM_ROUTE_ID);
+        if (param != null) {
+            return CamelContextHelper.mandatoryConvertTo(context, String.class, param);
+        }
+
         if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
-            return context.resolvePropertyPlaceholders("{{routeId}}");
+            return context.resolvePropertyPlaceholders("{{" + PARAM_ROUTE_ID + "}}");
         }
 
         String answer = StringHelper.after(remaining, "/");
         if (answer == null) {
-            answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid();
+            answer = extractTemplateId(context, remaining, parameters) + "-" + context.getUuidGenerator().generateUuid();
         }
 
         return answer;
@@ -84,4 +107,68 @@ public final class Kamelet {
 
         return properties;
     }
+
+    public static String addRouteFromTemplate(ModelCamelContext context, String routeId, String routeTemplateId, Map<String, Object> parameters) throws Exception {
+        RouteTemplateDefinition target = null;
+        for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
+            if (routeTemplateId.equals(def.getId())) {
+                target = def;
+                break;
+            }
+        }
+        if (target == null) {
+            throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
+        }
+
+        StringJoiner templatesBuilder = new StringJoiner(", ");
+        final Map<String, Object> prop = new HashMap<>();
+        // include default values first from the template (and validate that we have inputs for all required parameters)
+        if (target.getTemplateParameters() != null) {
+            for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
+                if (temp.getDefaultValue() != null) {
+                    prop.put(temp.getName(), temp.getDefaultValue());
+                } else {
+                    // this is a required parameter do we have that as input
+                    if (!parameters.containsKey(temp.getName())) {
+                        templatesBuilder.add(temp.getName());
+                    }
+                }
+            }
+        }
+        if (templatesBuilder.length() > 0) {
+            throw new IllegalArgumentException(
+                "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
+                    + templatesBuilder.toString());
+        }
+        // then override with user parameters
+        if (parameters != null) {
+            prop.putAll(parameters);
+        }
+
+        RouteDefinition def = target.asRouteDefinition();
+        // must make deep copy of input
+        def.setInput(null);
+        def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
+        if (routeId != null) {
+            def.setId(routeId);
+        }
+        // must make the source and sink endpoints are unique by appending the route id before we create the route from the template
+        if (def.getInput().getEndpointUri().startsWith("kamelet:source") || def.getInput().getEndpointUri().startsWith("kamelet//source")) {
+            def.getInput().setUri("kamelet:source?" + PARAM_ROUTE_ID + "=" + routeId);
+        }
+        Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
+        while (it.hasNext()) {
+            ToDefinition to = it.next();
+            if (to.getEndpointUri().startsWith("kamelet:sink") || to.getEndpointUri().startsWith("kamelet://sink")) {
+                to.setUri("kamelet:sink?" + PARAM_ROUTE_ID + "=" + routeId);
+            }
+        }
+
+        def.setTemplateParameters(prop);
+        context.removeRouteDefinition(def);
+        context.getRouteDefinitions().add(def);
+
+        return def.getId();
+    }
+
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 1cc92b2..9696c90 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -17,12 +17,9 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -30,14 +27,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.VetoCamelContextStartException;
-import org.apache.camel.model.EndpointRequiredDefinition;
-import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.ProcessorDefinitionHelper;
 import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.model.RouteTemplateDefinition;
-import org.apache.camel.model.RouteTemplateParameterDefinition;
-import org.apache.camel.model.ToDefinition;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.annotations.Component;
@@ -47,8 +38,9 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID;
+import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID;
+import static org.apache.camel.component.kamelet.Kamelet.addRouteFromTemplate;
 
 /**
  * The Kamelet Component provides support for materializing routes templates.
@@ -71,23 +63,53 @@ public class KameletComponent extends DefaultComponent {
     }
 
     @Override
-    public Endpoint createEndpoint(String uri) throws Exception {
-        return super.createEndpoint(uri);
-    }
-
-    @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
-        final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
-        final String newUri = "kamelet:" + templateId + "/" + routeId;
+        final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining, parameters);
+        final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining, parameters);
+
+        parameters.remove(PARAM_TEMPLATE_ID);
+        parameters.remove(PARAM_ROUTE_ID);
 
         final KameletEndpoint endpoint;
 
-        if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) {
-            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) {
+        if (Kamelet.SOURCE_ID.equals(remaining) || Kamelet.SINK_ID.equals(remaining)) {
+            //
+            // if remaining is either `source` or `sink' then it is a virtual
+            // endpoint that is used inside the kamelet definition to mark it
+            // as in/out endpoint.
+            //
+            // The following snippet defines a template which will act as a
+            // consumer for this Kamelet:
+            //
+            //     from("kamelet:source")
+            //         .to("log:info")
+            //
+            // The following snippet defines a template which will act as a
+            // producer for this Kamelet:
+            //
+            //     from("telegram:bots")
+            //         .to("kamelet:sink")
+            //
+            // Note that at the moment, there's no enforcement around `source`
+            // and `sink' to be defined on the right side (producer or consumer)
+            //
+            endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers);
+
+            // forward component properties
+            endpoint.setBlock(block);
+            endpoint.setTimeout(timeout);
+
+            // set endpoint specific properties
+            setProperties(endpoint, parameters);
+        } else {
+            endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers) {
                 @Override
                 protected void doInit() throws Exception {
                     super.doInit();
+                    //
+                    // since this is the real kamelet, then we need to hand it
+                    // over to the tracker.
+                    //
                     lifecycleHandler.track(this);
                 }
             };
@@ -110,20 +132,11 @@ public class KameletComponent extends DefaultComponent {
             //
             Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
             kameletProperties.putAll(parameters);
-            kameletProperties.put("templateId", templateId);
-            kameletProperties.put("routeId", routeId);
+            kameletProperties.put(PARAM_TEMPLATE_ID, templateId);
+            kameletProperties.put(PARAM_ROUTE_ID, routeId);
 
             // set kamelet specific properties
             endpoint.setKameletProperties(kameletProperties);
-        } else {
-            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers);
-
-            // forward component properties
-            endpoint.setBlock(block);
-            endpoint.setTimeout(timeout);
-
-            // set endpoint specific properties
-            setProperties(endpoint, parameters);
         }
 
         return endpoint;
@@ -193,8 +206,6 @@ public class KameletComponent extends DefaultComponent {
         @Override
         public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException {
             if (!this.initialized.compareAndExchange(false, true)) {
-                ModelCamelContext mcc = context.adapt(ModelCamelContext.class);
-
                 for (KameletEndpoint endpoint : endpoints) {
                     try {
                         createRouteForEndpoint(endpoint);
@@ -233,12 +244,14 @@ public class KameletComponent extends DefaultComponent {
 
             if (!def.isPrepared()) {
                 // when starting the route that was created from the template
-                // then we must provide the route id as local properties to the properties component
-                // as this route id is used internal by kamelets when they are starting
+                // then we must provide the route id as local properties to the
+                // properties component as this route id is used internal by
+                // kamelets when they are starting
                 PropertiesComponent pc = context.getPropertiesComponent();
                 try {
                     Properties prop = new Properties();
-                    prop.put("routeId", id);
+                    prop.put(PARAM_TEMPLATE_ID, endpoint.getTemplateId());
+                    prop.put(PARAM_ROUTE_ID, id);
                     pc.setLocalProperties(prop);
                     context.startRouteDefinitions(List.of(def));
                 } finally {
@@ -248,72 +261,5 @@ public class KameletComponent extends DefaultComponent {
 
             LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
         }
-
-        private static String addRouteFromTemplate(final ModelCamelContext context, final String routeId, final String routeTemplateId, final Map<String, Object> parameters)
-                throws Exception {
-            RouteTemplateDefinition target = null;
-            for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
-                if (routeTemplateId.equals(def.getId())) {
-                    target = def;
-                    break;
-                }
-            }
-            if (target == null) {
-                throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
-            }
-
-            StringJoiner templatesBuilder = new StringJoiner(", ");
-            final Map<String, Object> prop = new HashMap();
-            // include default values first from the template (and validate that we have inputs for all required parameters)
-            if (target.getTemplateParameters() != null) {
-                for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
-                    if (temp.getDefaultValue() != null) {
-                        prop.put(temp.getName(), temp.getDefaultValue());
-                    } else {
-                        // this is a required parameter do we have that as input
-                        if (!parameters.containsKey(temp.getName())) {
-                            templatesBuilder.add(temp.getName());
-                        }
-                    }
-                }
-            }
-            if (templatesBuilder.length() > 0) {
-                throw new IllegalArgumentException(
-                        "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
-                                + templatesBuilder.toString());
-            }
-            // then override with user parameters
-            if (parameters != null) {
-                prop.putAll(parameters);
-            }
-
-            RouteDefinition def = target.asRouteDefinition();
-            // must make deep copy of input
-            def.setInput(null);
-            def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
-            if (routeId != null) {
-                def.setId(routeId);
-            }
-            // must make the source and simk endpoints are unique by appending the route id before we create the route from the template
-            if (def.getInput().getEndpointUri().startsWith("kamelet:source")) {
-                def.getInput().setUri("kamelet:source?routeId=" + routeId);
-            }
-            Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
-            while (it.hasNext()) {
-                ToDefinition to = it.next();
-                if (to.getEndpointUri().startsWith("kamelet:sink")) {
-                    // TODO: must make deep copy
-                    to.setUri("kamelet:sink?routeId=" + routeId);
-                }
-            }
-
-
-            def.setTemplateParameters(prop);
-            context.removeRouteDefinition(def);
-            context.getRouteDefinitions().add(def);
-
-            return def.getId();
-        }
-
     }
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 2d6e883..c3760f3 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -20,6 +20,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -30,8 +31,6 @@ import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @UriEndpoint(
     firstVersion = "3.5.0",
@@ -39,10 +38,8 @@ import org.slf4j.LoggerFactory;
     syntax = "kamelet:templateId/routeId",
     title = "Kamelet",
     lenientProperties = true,
-    label = "camel-k")
+    category = Category.CORE)
 public class KameletEndpoint extends DefaultEndpoint {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class);
-
     @Metadata(required = true)
     @UriPath(description = "The Route Template ID")
     private final String templateId;
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index 9e6d86d..10bd42c 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -48,7 +48,7 @@ final class KameletProducer extends DefaultAsyncProducer {
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            final KameletConsumer consumer = getEndpoint().getConsumer();;
+            final KameletConsumer consumer = getEndpoint().getConsumer();
 
             if (consumer != null) {
                 return consumer.getAsyncProcessor().process(exchange, callback);
diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml
index 8ce15f1..d5df1ad 100644
--- a/components/camel-kamelet/src/test/resources/log4j2-test.xml
+++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml
@@ -32,7 +32,9 @@
     <Logger name="org.apache.camel.component.kamelet" level="TRACE"/>
 
     <Root level="INFO">
+      <!--
       <AppenderRef ref="STDOUT"/>
+      -->
       <AppenderRef ref="FILE"/>
     </Root>
   </Loggers>