You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 09:20:17 UTC

[camel-k-runtime] branch master updated (14210d1 -> 979288e)

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git.


    from 14210d1  Updated CHANGELOG.md
     new 4dede2a  kamelet source/sink component #490
     new cb2b893  Make kamelet:sink and kamelet:source work with workaround here (need to find nicer solution in camel-core)
     new 979288e  kamelet source/sink component #490

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/resources/application.properties      |   2 +-
 .../src/test/resources/routes/set-body.yaml        |   2 +-
 .../src/test/resources/routes/to-upper.yaml        |   2 +-
 components/camel-kamelet/pom.xml                   |   9 +-
 .../kamelet/KameletComponentConfigurer.java        |   6 +
 .../kamelet/KameletEndpointConfigurer.java         |  11 ++
 .../apache/camel/component/kamelet/kamelet.json    |   7 +-
 .../apache/camel/component/kamelet/Kamelet.java    | 115 ++++++++++--
 .../camel/component/kamelet/KameletComponent.java  | 192 +++++++++++++++++----
 .../camel/component/kamelet/KameletConsumer.java   |  74 ++++++++
 .../camel/component/kamelet/KameletEndpoint.java   | 159 +++++++++--------
 .../camel/component/kamelet/KameletProducer.java   |  70 ++++++++
 .../camel/component/kamelet/KameletBasicTest.java  |   4 +-
 .../component/kamelet/KameletPropertiesTest.java   |   2 +-
 .../camel/component/kamelet/KameletRouteTest.java  |  23 +--
 .../component/kamelet/KameletValidationTest.java   |   2 +-
 16 files changed, 519 insertions(+), 161 deletions(-)
 create mode 100644 components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
 create mode 100644 components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java


[camel-k-runtime] 01/03: kamelet source/sink component #490

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 4dede2ab1d61551b675b6bde82dae236511ac07e
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Mon Sep 28 11:56:22 2020 +0200

    kamelet source/sink component #490
---
 components/camel-kamelet/pom.xml                   |   9 +-
 .../apache/camel/component/kamelet/Kamelet.java    |  32 ++---
 .../camel/component/kamelet/KameletComponent.java  | 143 ++++++++++++++-----
 .../camel/component/kamelet/KameletConsumer.java   |  74 ++++++++++
 .../camel/component/kamelet/KameletEndpoint.java   | 157 ++++++++++++---------
 .../camel/component/kamelet/KameletProducer.java   |  70 +++++++++
 .../camel/component/kamelet/KameletBasicTest.java  |   4 +-
 .../component/kamelet/KameletPropertiesTest.java   |   2 +-
 .../camel/component/kamelet/KameletRouteTest.java  |  23 +--
 .../component/kamelet/KameletValidationTest.java   |   2 +-
 .../src/test/resources/log4j2-test.xml             |   2 -
 11 files changed, 361 insertions(+), 157 deletions(-)

diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml
index 21232ac..426c1f7 100644
--- a/components/camel-kamelet/pom.xml
+++ b/components/camel-kamelet/pom.xml
@@ -40,10 +40,6 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-engine</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-direct</artifactId>
-        </dependency>
 
         <!-- ****************************** -->
         <!--                                -->
@@ -78,6 +74,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-direct</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-test-junit5</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index d9ac81b..689c1ed 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -17,24 +17,19 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Predicate;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public final class Kamelet {
     public static final String PROPERTIES_PREFIX = "camel.kamelet.";
     public static final String SCHEME = "kamelet";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(Kamelet.class);
+    public static final String SOURCE_ID = "source";
+    public static final String SINK_ID = "sink";
 
     private Kamelet() {
     }
@@ -43,20 +38,11 @@ public final class Kamelet {
         return item -> item.startsWith(prefix);
     }
 
-    public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception {
-        LOGGER.debug("Creating route from template {}", endpoint.getTemplateId());
-
-        ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
-        String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
-        RouteDefinition def = context.getRouteDefinition(id);
-        if (!def.isPrepared()) {
-            context.startRouteDefinitions(List.of(def));
+    public static String extractTemplateId(CamelContext context, String remaining) {
+        if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
+            return context.resolvePropertyPlaceholders("{{templateId}}");
         }
 
-        LOGGER.debug("Route {} created from template {}", id, endpoint.getTemplateId());
-    }
-
-    public static  String extractTemplateId(CamelContext context, String remaining) {
         String answer = StringHelper.before(remaining, "/");
         if (answer == null) {
             answer = remaining;
@@ -65,7 +51,11 @@ public final class Kamelet {
         return answer;
     }
 
-    public static  String extractRouteId(CamelContext context, String remaining) {
+    public static String extractRouteId(CamelContext context, String remaining) {
+        if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
+            return context.resolvePropertyPlaceholders("{{routeId}}");
+        }
+
         String answer = StringHelper.after(remaining, "/");
         if (answer == null) {
             answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid();
@@ -74,7 +64,7 @@ public final class Kamelet {
         return answer;
     }
 
-    public static  Map<String, Object> extractKameletProperties(CamelContext context, String... elements) {
+    public static Map<String, Object> extractKameletProperties(CamelContext context, String... elements) {
         PropertiesComponent pc = context.getPropertiesComponent();
         Map<String, Object> properties = new HashMap<>();
         String prefix = Kamelet.PROPERTIES_PREFIX;
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 7a97d9c..91a2514 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -19,64 +19,120 @@ package org.apache.camel.component.kamelet;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.VetoCamelContextStartException;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * The Kamelet Component provides support for interacting with <a href="https://knative.dev">Knative</a>.
+ * The Kamelet Component provides support for materializing routes templates.
  */
 @Component(Kamelet.SCHEME)
 public class KameletComponent extends DefaultComponent {
-    private final LifecycleHandler lifecycleHandler;
+    private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
 
-    public KameletComponent() {
-        this(null);
-    }
+    private final Map<String, KameletConsumer> consumers;
+    private final LifecycleHandler lifecycleHandler;
 
-    public KameletComponent(CamelContext context) {
-        super(context);
+    @Metadata(label = "producer", defaultValue = "true")
+    private boolean block = true;
+    @Metadata(label = "producer", defaultValue = "30000")
+    private long timeout = 30000L;
 
+    public KameletComponent() {
         this.lifecycleHandler = new LifecycleHandler();
+        this.consumers = new ConcurrentHashMap<>();
     }
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
         final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
+        final String newUri = "kamelet:" + templateId + "/" + routeId;
+
+        final KameletEndpoint endpoint;
 
-        //
-        // The properties for the kamelets are determined by global properties
-        // and local endpoint parameters,
-        //
-        // Global parameters are loaded in the following order:
-        //
-        //   camel.kamelet." + templateId
-        //   camel.kamelet." + templateId + "." routeId
-        //
-        Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
-        kameletProperties.putAll(parameters);
-        kameletProperties.put("templateId", templateId);
-        kameletProperties.put("routeId", routeId);
-
-        // Remaining parameter should be related to the route and to avoid the
-        // parameters validation to fail, we need to clear the parameters map.
-        parameters.clear();
-
-        KameletEndpoint endpoint = new KameletEndpoint(uri, this, templateId, routeId, kameletProperties);
-
-        // No parameters are expected here.
-        setProperties(endpoint, parameters);
+        if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) {
+            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) {
+                @Override
+                protected void doInit() throws Exception {
+                    super.doInit();
+                    lifecycleHandler.track(this);
+                }
+            };
+
+            // forward component properties
+            endpoint.setBlock(block);
+            endpoint.setTimeout(timeout);
+
+            // set endpoint specific properties
+            setProperties(endpoint, parameters);
+
+            //
+            // The properties for the kamelets are determined by global properties
+            // and local endpoint parameters,
+            //
+            // Global parameters are loaded in the following order:
+            //
+            //   camel.kamelet." + templateId
+            //   camel.kamelet." + templateId + "." routeId
+            //
+            Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
+            kameletProperties.putAll(parameters);
+            kameletProperties.put("templateId", templateId);
+            kameletProperties.put("routeId", routeId);
+
+            // set kamelet specific properties
+            endpoint.setKameletProperties(kameletProperties);
+        } else {
+            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers);
+
+            // forward component properties
+            endpoint.setBlock(block);
+            endpoint.setTimeout(timeout);
+
+            // set endpoint specific properties
+            setProperties(endpoint, parameters);
+        }
 
         return endpoint;
     }
 
+    public boolean isBlock() {
+        return block;
+    }
+
+    /**
+     * If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block
+     * and wait for the consumer to become active.
+     */
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * The timeout value to use if block is enabled.
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
     @Override
     protected void doInit() throws Exception {
         getCamelContext().addLifecycleStrategy(lifecycleHandler);
@@ -91,11 +147,11 @@ public class KameletComponent extends DefaultComponent {
     @Override
     protected void doStop() throws Exception {
         getCamelContext().getLifecycleStrategies().remove(lifecycleHandler);
-        super.doStop();
-    }
 
-    void onEndpointAdd(KameletEndpoint endpoint) {
-        lifecycleHandler.track(endpoint);
+        ServiceHelper.stopService(consumers.values());
+        consumers.clear();
+
+        super.doStop();
     }
 
     /*
@@ -118,9 +174,11 @@ public class KameletComponent extends DefaultComponent {
         @Override
         public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException {
             if (!this.initialized.compareAndExchange(false, true)) {
+                ModelCamelContext mcc = context.adapt(ModelCamelContext.class);
+
                 for (KameletEndpoint endpoint : endpoints) {
                     try {
-                        Kamelet.createRouteForEndpoint(endpoint);
+                        createRouteForEndpoint(endpoint);
                     } catch (Exception e) {
                         throw new VetoCamelContextStartException("Failure creating route from template: " + endpoint.getTemplateId(), e, context);
                     }
@@ -137,13 +195,28 @@ public class KameletComponent extends DefaultComponent {
         public void track(KameletEndpoint endpoint) {
             if (this.initialized.get()) {
                 try {
-                    Kamelet.createRouteForEndpoint(endpoint);
+                    createRouteForEndpoint(endpoint);
                 } catch (Exception e) {
                     throw RuntimeCamelException.wrapRuntimeException(e);
                 }
             } else {
+                LOGGER.debug("Tracking route template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
                 this.endpoints.add(endpoint);
             }
         }
+
+        public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception {
+            LOGGER.debug("Creating route from template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
+
+            final ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
+            final String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+            final RouteDefinition def = context.getRouteDefinition(id);
+
+            if (!def.isPrepared()) {
+                context.startRouteDefinitions(List.of(def));
+            }
+
+            LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
+        }
     }
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
new file mode 100644
index 0000000..c99d56c
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.Suspendable;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.support.DefaultConsumer;
+
+final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
+    public KameletConsumer(KameletEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    public KameletEndpoint getEndpoint() {
+        return (KameletEndpoint)super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        getEndpoint().addConsumer(this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        getEndpoint().removeConsumer(this);
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        getEndpoint().removeConsumer(this);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        getEndpoint().addConsumer(this);
+    }
+
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // deny stopping on shutdown as we want kamelet consumers to run in
+        // case some other queues depend on this consumer to run, so it can
+        // complete its exchanges
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        // return 0 as we do not have an internal memory queue with a variable
+        // size of inflight messages.
+        return 0;
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // noop
+    }
+}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 3209c6d..80d8e10 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -17,23 +17,21 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProducer;
 import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
-import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @UriEndpoint(
     firstVersion = "3.5.0",
@@ -43,34 +41,67 @@ import org.apache.camel.util.ObjectHelper;
     lenientProperties = true,
     label = "camel-k")
 public class KameletEndpoint extends DefaultEndpoint {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class);
+
     @Metadata(required = true)
     @UriPath(description = "The Route Template ID")
     private final String templateId;
-
     @Metadata(required = false)
     @UriPath(description = "The Route ID", defaultValueNote = "The ID will be auto-generated if not provided")
     private final String routeId;
 
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean block = true;
+    @UriParam(label = "producer", defaultValue = "30000")
+    private long timeout = 30000L;
+    @UriParam(label = "producer", defaultValue = "true")
+
     private final Map<String, Object> kameletProperties;
-    private final String kameletUri;
+    private final Map<String, KameletConsumer> consumers;
+    private final String key;
 
     public KameletEndpoint(
             String uri,
             KameletComponent component,
             String templateId,
             String routeId,
-            Map<String, Object> kameletProperties) {
+            Map<String, KameletConsumer> consumers) {
 
         super(uri, component);
 
         ObjectHelper.notNull(templateId, "template id");
         ObjectHelper.notNull(routeId, "route id");
-        ObjectHelper.notNull(kameletProperties, "kamelet properties");
 
         this.templateId = templateId;
         this.routeId = routeId;
-        this.kameletProperties = Collections.unmodifiableMap(kameletProperties);
-        this.kameletUri = "direct:" + routeId;
+        this.key = templateId + "/" + routeId;
+        this.kameletProperties = new HashMap<>();
+        this.consumers = consumers;
+    }
+
+    public boolean isBlock() {
+        return block;
+    }
+
+    /**
+     * If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block
+     * and wait for the consumer to become active.
+     */
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * The timeout value to use if block is enabled.
+     *
+     * @param timeout the timeout value
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
     }
 
     @Override
@@ -83,6 +114,11 @@ public class KameletEndpoint extends DefaultEndpoint {
         return true;
     }
 
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
     public String getTemplateId() {
         return templateId;
     }
@@ -91,88 +127,71 @@ public class KameletEndpoint extends DefaultEndpoint {
         return routeId;
     }
 
+    public void setKameletProperties(Map<String, Object> kameletProperties) {
+        if (kameletProperties != null) {
+            this.kameletProperties.clear();
+            this.kameletProperties.putAll(kameletProperties);
+        }
+    }
+
     public Map<String, Object> getKameletProperties() {
-        return kameletProperties;
+        return Collections.unmodifiableMap(kameletProperties);
     }
 
     @Override
     public Producer createProducer() throws Exception {
-        return new KameletProducer();
+        return new KameletProducer(this);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer answer = new KemeletConsumer(processor);
+        Consumer answer = new KameletConsumer(this, processor);
         configureConsumer(answer);
         return answer;
     }
 
-    @Override
-    protected void doInit() throws Exception {
-        super.doInit();
-        getComponent().onEndpointAdd(this);
-    }
-
     // *********************************
     //
     // Helpers
     //
     // *********************************
 
-    private class KemeletConsumer extends DefaultConsumer {
-        private volatile Endpoint endpoint;
-        private volatile Consumer consumer;
-
-        public KemeletConsumer(Processor processor) {
-            super(KameletEndpoint.this, processor);
-        }
-
-        @Override
-        protected void doStart() throws Exception {
-            endpoint = getCamelContext().getEndpoint(kameletUri);
-            consumer = endpoint.createConsumer(getProcessor());
-
-            ServiceHelper.startService(endpoint, consumer);
-            super.doStart();
-        }
-
-        @Override
-        protected void doStop() throws Exception {
-            ServiceHelper.stopService(consumer, endpoint);
-            super.doStop();
+    void addConsumer(KameletConsumer consumer) {
+        synchronized (consumers) {
+            if (consumers.putIfAbsent(key, consumer) != null) {
+                throw new IllegalArgumentException(
+                    "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
+            }
+            consumers.notifyAll();
         }
     }
 
-    private class KameletProducer extends DefaultAsyncProducer {
-        private volatile Endpoint endpoint;
-        private volatile AsyncProducer producer;
-
-        public KameletProducer() {
-            super(KameletEndpoint.this);
+    void removeConsumer(KameletConsumer consumer) {
+        synchronized (consumers) {
+            consumers.remove(key, consumer);
+            consumers.notifyAll();
         }
+    }
 
-        @Override
-        public boolean process(Exchange exchange, AsyncCallback callback) {
-            if (producer != null) {
-                return producer.process(exchange, callback);
-            } else {
-                callback.done(true);
-                return true;
+    KameletConsumer getConsumer() throws InterruptedException {
+        synchronized (consumers) {
+            KameletConsumer answer = consumers.get(key);
+            if (answer == null && block) {
+                StopWatch watch = new StopWatch();
+                for (; ; ) {
+                    answer =consumers.get(key);
+                    if (answer != null) {
+                        break;
+                    }
+                    long rem = timeout - watch.taken();
+                    if (rem <= 0) {
+                        break;
+                    }
+                    consumers.wait(rem);
+                }
             }
-        }
-
-        @Override
-        protected void doStart() throws Exception {
-            endpoint = getCamelContext().getEndpoint(kameletUri);
-            producer = endpoint.createAsyncProducer();
-            ServiceHelper.startService(endpoint, producer);
-            super.doStart();
-        }
 
-        @Override
-        protected void doStop() throws Exception {
-            ServiceHelper.stopService(producer, endpoint);
-            super.doStop();
+            return answer;
         }
     }
 }
\ No newline at end of file
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
new file mode 100644
index 0000000..9e6d86d
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+
+final class KameletProducer extends DefaultAsyncProducer {
+    public KameletProducer(KameletEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public KameletEndpoint getEndpoint() {
+        return (KameletEndpoint)super.getEndpoint();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        final KameletConsumer consumer = getEndpoint().getConsumer();
+
+        if (consumer != null) {
+            consumer.getProcessor().process(exchange);
+        } else {
+            exchange.setException(
+                new CamelExchangeException(
+                    "No consumers available on endpoint: " + getEndpoint(), exchange)
+            );
+        }
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            final KameletConsumer consumer = getEndpoint().getConsumer();;
+
+            if (consumer != null) {
+                return consumer.getAsyncProcessor().process(exchange, callback);
+            } else {
+                exchange.setException(
+                    new CamelExchangeException(
+                        "No consumers available on endpoint: " + getEndpoint(), exchange)
+                );
+
+                callback.done(true);
+                return true;
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+}
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
index e023e07..5826f21 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
@@ -78,13 +78,13 @@ public class KameletBasicTest extends CamelTestSupport {
             public void configure() throws Exception {
                 routeTemplate("setBody")
                     .templateParameter("bodyValue")
-                    .from("direct:{{routeId}}")
+                    .from("kamelet:source")
                     .setBody().constant("{{bodyValue}}");
 
                 routeTemplate("tick")
                     .from("timer:{{routeId}}?repeatCount=1&delay=-1")
                     .setBody().exchangeProperty(Exchange.TIMER_COUNTER)
-                    .to("direct:{{routeId}}");
+                    .to("kamelet:sink");
 
                 from("direct:templateEmbedded")
                     .toF("kamelet:setBody/embedded?bodyValue=embedded");
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
index d33a15b..67d6ff5 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
@@ -78,7 +78,7 @@ public class KameletPropertiesTest extends CamelTestSupport {
                 // template
                 routeTemplate("setBody")
                     .templateParameter("bodyValue")
-                    .from("direct:{{routeId}}")
+                    .from("kamelet:source")
                     .setBody().constant("{{bodyValue}}");
             }
         };
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
index 8aed313..7e8c345 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
@@ -18,16 +18,13 @@ package org.apache.camel.component.kamelet;
 
 import java.util.UUID;
 
-import org.apache.camel.CamelExecutionException;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.direct.DirectConsumerNotAvailableException;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.http.annotation.Obsolete;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 public class KameletRouteTest extends CamelTestSupport {
     @Test
@@ -48,15 +45,6 @@ public class KameletRouteTest extends CamelTestSupport {
         ).isEqualTo("b-a-" + body);
     }
 
-    @Test
-    public void testFailure()  {
-        String body = UUID.randomUUID().toString();
-
-        assertThatExceptionOfType(CamelExecutionException.class)
-            .isThrownBy(() -> fluentTemplate.toF("direct:fail").withBody(body).request(String.class))
-            .withCauseExactlyInstanceOf(DirectConsumerNotAvailableException.class);
-    }
-
     // **********************************************
     //
     // test set-up
@@ -70,12 +58,7 @@ public class KameletRouteTest extends CamelTestSupport {
             public void configure() throws Exception {
                 routeTemplate("echo")
                     .templateParameter("prefix")
-                    .from("direct:{{routeId}}")
-                    .setBody().simple("{{prefix}}-${body}");
-
-                routeTemplate("echo-fail")
-                    .templateParameter("prefix")
-                    .from("direct:#property:routeId")
+                    .from("kamelet:source")
                     .setBody().simple("{{prefix}}-${body}");
 
                 from("direct:single")
@@ -86,10 +69,6 @@ public class KameletRouteTest extends CamelTestSupport {
                     .to("kamelet:echo/1?prefix=a")
                     .to("kamelet:echo/2?prefix=b")
                     .log("${body}");
-
-                from("direct:fail")
-                    .to("kamelet:echo-fail?prefix=a")
-                    .log("${body}");
             }
         };
     }
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
index e16cf10..f35c8e3 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
@@ -34,7 +34,7 @@ public class KameletValidationTest {
             public void configure() throws Exception {
                 routeTemplate("setBody")
                     .templateParameter("bodyValue")
-                    .from("direct:{{routeId}}")
+                    .from("kamelet:source")
                     .setBody().constant("{{bodyValue}}");
 
                 from("direct:start")
diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml
index d5df1ad..8ce15f1 100644
--- a/components/camel-kamelet/src/test/resources/log4j2-test.xml
+++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml
@@ -32,9 +32,7 @@
     <Logger name="org.apache.camel.component.kamelet" level="TRACE"/>
 
     <Root level="INFO">
-      <!--
       <AppenderRef ref="STDOUT"/>
-      -->
       <AppenderRef ref="FILE"/>
     </Root>
   </Loggers>


[camel-k-runtime] 03/03: kamelet source/sink component #490

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 979288ecc735e8613a39ca5d627d2f288fb68c2f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Mon Sep 28 20:59:58 2020 +0200

    kamelet source/sink component #490
---
 .../src/main/resources/application.properties      |   2 +-
 .../src/test/resources/routes/set-body.yaml        |   2 +-
 .../src/test/resources/routes/to-upper.yaml        |   2 +-
 .../apache/camel/component/kamelet/kamelet.json    |   2 +-
 .../apache/camel/component/kamelet/Kamelet.java    |  97 ++++++++++++-
 .../camel/component/kamelet/KameletComponent.java  | 154 +++++++--------------
 .../camel/component/kamelet/KameletEndpoint.java   |   7 +-
 .../camel/component/kamelet/KameletProducer.java   |   2 +-
 .../src/test/resources/log4j2-test.xml             |   2 +
 9 files changed, 151 insertions(+), 119 deletions(-)

diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
index 3ce5493..5b8f41a 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
@@ -18,6 +18,6 @@
 #
 # Quarkus
 #
-quarkus.log.console.enable = false
+quarkus.log.console.enable = true
 quarkus.banner.enabled     = false
 
diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
index 55347e9..c311da1 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
@@ -16,7 +16,7 @@
 #
 
 - from:
-    uri: "direct:{{routeId}}"
+    uri: "kamelet:source"
     steps:
       - set-body:
           constant: "{{bodyValue}}"
\ No newline at end of file
diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
index 74105f1..ba51838 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
@@ -16,7 +16,7 @@
 #
 
 - from:
-    uri: "direct:{{routeId}}"
+    uri: "kamelet:source"
     steps:
       - set-body:
           constant: "{{message}}"
diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
index a66f1ca..4dd132f 100644
--- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
+++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
@@ -6,7 +6,7 @@
     "description": "The Kamelet Component provides support for interacting with Knative",
     "deprecated": false,
     "firstVersion": "3.5.0",
-    "label": "camel-k",
+    "label": "core",
     "javaType": "org.apache.camel.component.kamelet.KameletComponent",
     "supportLevel": "Preview",
     "groupId": "org.apache.camel.k",
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index 689c1ed..14ecc43 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -17,19 +17,32 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.function.Predicate;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.model.FromDefinition;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
+import org.apache.camel.model.RouteTemplateParameterDefinition;
+import org.apache.camel.model.ToDefinition;
 import org.apache.camel.spi.PropertiesComponent;
+import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.util.StringHelper;
 
+import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+
 public final class Kamelet {
     public static final String PROPERTIES_PREFIX = "camel.kamelet.";
     public static final String SCHEME = "kamelet";
     public static final String SOURCE_ID = "source";
     public static final String SINK_ID = "sink";
+    public static final String PARAM_ROUTE_ID = "routeId";
+    public static final String PARAM_TEMPLATE_ID = "templateId";
 
     private Kamelet() {
     }
@@ -38,9 +51,14 @@ public final class Kamelet {
         return item -> item.startsWith(prefix);
     }
 
-    public static String extractTemplateId(CamelContext context, String remaining) {
+    public static String extractTemplateId(CamelContext context, String remaining, Map<String, Object> parameters) {
+        Object param = parameters.get(PARAM_TEMPLATE_ID);
+        if (param != null) {
+            return CamelContextHelper.mandatoryConvertTo(context, String.class, param);
+        }
+
         if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
-            return context.resolvePropertyPlaceholders("{{templateId}}");
+            return context.resolvePropertyPlaceholders("{{" + PARAM_TEMPLATE_ID + "}}");
         }
 
         String answer = StringHelper.before(remaining, "/");
@@ -51,14 +69,19 @@ public final class Kamelet {
         return answer;
     }
 
-    public static String extractRouteId(CamelContext context, String remaining) {
+    public static String extractRouteId(CamelContext context, String remaining, Map<String, Object> parameters) {
+        Object param = parameters.get(PARAM_ROUTE_ID);
+        if (param != null) {
+            return CamelContextHelper.mandatoryConvertTo(context, String.class, param);
+        }
+
         if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
-            return context.resolvePropertyPlaceholders("{{routeId}}");
+            return context.resolvePropertyPlaceholders("{{" + PARAM_ROUTE_ID + "}}");
         }
 
         String answer = StringHelper.after(remaining, "/");
         if (answer == null) {
-            answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid();
+            answer = extractTemplateId(context, remaining, parameters) + "-" + context.getUuidGenerator().generateUuid();
         }
 
         return answer;
@@ -84,4 +107,68 @@ public final class Kamelet {
 
         return properties;
     }
+
+    public static String addRouteFromTemplate(ModelCamelContext context, String routeId, String routeTemplateId, Map<String, Object> parameters) throws Exception {
+        RouteTemplateDefinition target = null;
+        for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
+            if (routeTemplateId.equals(def.getId())) {
+                target = def;
+                break;
+            }
+        }
+        if (target == null) {
+            throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
+        }
+
+        StringJoiner templatesBuilder = new StringJoiner(", ");
+        final Map<String, Object> prop = new HashMap<>();
+        // include default values first from the template (and validate that we have inputs for all required parameters)
+        if (target.getTemplateParameters() != null) {
+            for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
+                if (temp.getDefaultValue() != null) {
+                    prop.put(temp.getName(), temp.getDefaultValue());
+                } else {
+                    // this is a required parameter do we have that as input
+                    if (!parameters.containsKey(temp.getName())) {
+                        templatesBuilder.add(temp.getName());
+                    }
+                }
+            }
+        }
+        if (templatesBuilder.length() > 0) {
+            throw new IllegalArgumentException(
+                "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
+                    + templatesBuilder.toString());
+        }
+        // then override with user parameters
+        if (parameters != null) {
+            prop.putAll(parameters);
+        }
+
+        RouteDefinition def = target.asRouteDefinition();
+        // must make deep copy of input
+        def.setInput(null);
+        def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
+        if (routeId != null) {
+            def.setId(routeId);
+        }
+        // must make the source and sink endpoints are unique by appending the route id before we create the route from the template
+        if (def.getInput().getEndpointUri().startsWith("kamelet:source") || def.getInput().getEndpointUri().startsWith("kamelet//source")) {
+            def.getInput().setUri("kamelet:source?" + PARAM_ROUTE_ID + "=" + routeId);
+        }
+        Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
+        while (it.hasNext()) {
+            ToDefinition to = it.next();
+            if (to.getEndpointUri().startsWith("kamelet:sink") || to.getEndpointUri().startsWith("kamelet://sink")) {
+                to.setUri("kamelet:sink?" + PARAM_ROUTE_ID + "=" + routeId);
+            }
+        }
+
+        def.setTemplateParameters(prop);
+        context.removeRouteDefinition(def);
+        context.getRouteDefinitions().add(def);
+
+        return def.getId();
+    }
+
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 1cc92b2..9696c90 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -17,12 +17,9 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -30,14 +27,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.VetoCamelContextStartException;
-import org.apache.camel.model.EndpointRequiredDefinition;
-import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.ProcessorDefinitionHelper;
 import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.model.RouteTemplateDefinition;
-import org.apache.camel.model.RouteTemplateParameterDefinition;
-import org.apache.camel.model.ToDefinition;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.annotations.Component;
@@ -47,8 +38,9 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID;
+import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID;
+import static org.apache.camel.component.kamelet.Kamelet.addRouteFromTemplate;
 
 /**
  * The Kamelet Component provides support for materializing routes templates.
@@ -71,23 +63,53 @@ public class KameletComponent extends DefaultComponent {
     }
 
     @Override
-    public Endpoint createEndpoint(String uri) throws Exception {
-        return super.createEndpoint(uri);
-    }
-
-    @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
-        final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
-        final String newUri = "kamelet:" + templateId + "/" + routeId;
+        final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining, parameters);
+        final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining, parameters);
+
+        parameters.remove(PARAM_TEMPLATE_ID);
+        parameters.remove(PARAM_ROUTE_ID);
 
         final KameletEndpoint endpoint;
 
-        if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) {
-            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) {
+        if (Kamelet.SOURCE_ID.equals(remaining) || Kamelet.SINK_ID.equals(remaining)) {
+            //
+            // if remaining is either `source` or `sink' then it is a virtual
+            // endpoint that is used inside the kamelet definition to mark it
+            // as in/out endpoint.
+            //
+            // The following snippet defines a template which will act as a
+            // consumer for this Kamelet:
+            //
+            //     from("kamelet:source")
+            //         .to("log:info")
+            //
+            // The following snippet defines a template which will act as a
+            // producer for this Kamelet:
+            //
+            //     from("telegram:bots")
+            //         .to("kamelet:sink")
+            //
+            // Note that at the moment, there's no enforcement around `source`
+            // and `sink' to be defined on the right side (producer or consumer)
+            //
+            endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers);
+
+            // forward component properties
+            endpoint.setBlock(block);
+            endpoint.setTimeout(timeout);
+
+            // set endpoint specific properties
+            setProperties(endpoint, parameters);
+        } else {
+            endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers) {
                 @Override
                 protected void doInit() throws Exception {
                     super.doInit();
+                    //
+                    // since this is the real kamelet, then we need to hand it
+                    // over to the tracker.
+                    //
                     lifecycleHandler.track(this);
                 }
             };
@@ -110,20 +132,11 @@ public class KameletComponent extends DefaultComponent {
             //
             Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
             kameletProperties.putAll(parameters);
-            kameletProperties.put("templateId", templateId);
-            kameletProperties.put("routeId", routeId);
+            kameletProperties.put(PARAM_TEMPLATE_ID, templateId);
+            kameletProperties.put(PARAM_ROUTE_ID, routeId);
 
             // set kamelet specific properties
             endpoint.setKameletProperties(kameletProperties);
-        } else {
-            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers);
-
-            // forward component properties
-            endpoint.setBlock(block);
-            endpoint.setTimeout(timeout);
-
-            // set endpoint specific properties
-            setProperties(endpoint, parameters);
         }
 
         return endpoint;
@@ -193,8 +206,6 @@ public class KameletComponent extends DefaultComponent {
         @Override
         public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException {
             if (!this.initialized.compareAndExchange(false, true)) {
-                ModelCamelContext mcc = context.adapt(ModelCamelContext.class);
-
                 for (KameletEndpoint endpoint : endpoints) {
                     try {
                         createRouteForEndpoint(endpoint);
@@ -233,12 +244,14 @@ public class KameletComponent extends DefaultComponent {
 
             if (!def.isPrepared()) {
                 // when starting the route that was created from the template
-                // then we must provide the route id as local properties to the properties component
-                // as this route id is used internal by kamelets when they are starting
+                // then we must provide the route id as local properties to the
+                // properties component as this route id is used internal by
+                // kamelets when they are starting
                 PropertiesComponent pc = context.getPropertiesComponent();
                 try {
                     Properties prop = new Properties();
-                    prop.put("routeId", id);
+                    prop.put(PARAM_TEMPLATE_ID, endpoint.getTemplateId());
+                    prop.put(PARAM_ROUTE_ID, id);
                     pc.setLocalProperties(prop);
                     context.startRouteDefinitions(List.of(def));
                 } finally {
@@ -248,72 +261,5 @@ public class KameletComponent extends DefaultComponent {
 
             LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
         }
-
-        private static String addRouteFromTemplate(final ModelCamelContext context, final String routeId, final String routeTemplateId, final Map<String, Object> parameters)
-                throws Exception {
-            RouteTemplateDefinition target = null;
-            for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
-                if (routeTemplateId.equals(def.getId())) {
-                    target = def;
-                    break;
-                }
-            }
-            if (target == null) {
-                throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
-            }
-
-            StringJoiner templatesBuilder = new StringJoiner(", ");
-            final Map<String, Object> prop = new HashMap();
-            // include default values first from the template (and validate that we have inputs for all required parameters)
-            if (target.getTemplateParameters() != null) {
-                for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
-                    if (temp.getDefaultValue() != null) {
-                        prop.put(temp.getName(), temp.getDefaultValue());
-                    } else {
-                        // this is a required parameter do we have that as input
-                        if (!parameters.containsKey(temp.getName())) {
-                            templatesBuilder.add(temp.getName());
-                        }
-                    }
-                }
-            }
-            if (templatesBuilder.length() > 0) {
-                throw new IllegalArgumentException(
-                        "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
-                                + templatesBuilder.toString());
-            }
-            // then override with user parameters
-            if (parameters != null) {
-                prop.putAll(parameters);
-            }
-
-            RouteDefinition def = target.asRouteDefinition();
-            // must make deep copy of input
-            def.setInput(null);
-            def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
-            if (routeId != null) {
-                def.setId(routeId);
-            }
-            // must make the source and simk endpoints are unique by appending the route id before we create the route from the template
-            if (def.getInput().getEndpointUri().startsWith("kamelet:source")) {
-                def.getInput().setUri("kamelet:source?routeId=" + routeId);
-            }
-            Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
-            while (it.hasNext()) {
-                ToDefinition to = it.next();
-                if (to.getEndpointUri().startsWith("kamelet:sink")) {
-                    // TODO: must make deep copy
-                    to.setUri("kamelet:sink?routeId=" + routeId);
-                }
-            }
-
-
-            def.setTemplateParameters(prop);
-            context.removeRouteDefinition(def);
-            context.getRouteDefinitions().add(def);
-
-            return def.getId();
-        }
-
     }
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 2d6e883..c3760f3 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -20,6 +20,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -30,8 +31,6 @@ import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @UriEndpoint(
     firstVersion = "3.5.0",
@@ -39,10 +38,8 @@ import org.slf4j.LoggerFactory;
     syntax = "kamelet:templateId/routeId",
     title = "Kamelet",
     lenientProperties = true,
-    label = "camel-k")
+    category = Category.CORE)
 public class KameletEndpoint extends DefaultEndpoint {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class);
-
     @Metadata(required = true)
     @UriPath(description = "The Route Template ID")
     private final String templateId;
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index 9e6d86d..10bd42c 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -48,7 +48,7 @@ final class KameletProducer extends DefaultAsyncProducer {
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            final KameletConsumer consumer = getEndpoint().getConsumer();;
+            final KameletConsumer consumer = getEndpoint().getConsumer();
 
             if (consumer != null) {
                 return consumer.getAsyncProcessor().process(exchange, callback);
diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml
index 8ce15f1..d5df1ad 100644
--- a/components/camel-kamelet/src/test/resources/log4j2-test.xml
+++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml
@@ -32,7 +32,9 @@
     <Logger name="org.apache.camel.component.kamelet" level="TRACE"/>
 
     <Root level="INFO">
+      <!--
       <AppenderRef ref="STDOUT"/>
+      -->
       <AppenderRef ref="FILE"/>
     </Root>
   </Loggers>


[camel-k-runtime] 02/03: Make kamelet:sink and kamelet:source work with workaround here (need to find nicer solution in camel-core)

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit cb2b8934fbb7226e40c2ae965c84d9b768edc178
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Sep 28 18:17:18 2020 +0200

    Make kamelet:sink and kamelet:source work with workaround here (need to find nicer solution in camel-core)
---
 .../kamelet/KameletComponentConfigurer.java        |   6 ++
 .../kamelet/KameletEndpointConfigurer.java         |  11 +++
 .../apache/camel/component/kamelet/kamelet.json    |   5 +
 .../camel/component/kamelet/KameletComponent.java  | 101 ++++++++++++++++++++-
 .../camel/component/kamelet/KameletEndpoint.java   |   3 +
 5 files changed, 124 insertions(+), 2 deletions(-)

diff --git a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java
index a0a384d..7c3717a 100644
--- a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java
+++ b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletComponentConfigurer.java
@@ -21,10 +21,12 @@ public class KameletComponentConfigurer extends PropertyConfigurerSupport implem
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "block": target.setBlock(property(camelContext, boolean.class, value)); return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
+        case "timeout": target.setTimeout(property(camelContext, long.class, value)); return true;
         default: return false;
         }
     }
@@ -33,8 +35,10 @@ public class KameletComponentConfigurer extends PropertyConfigurerSupport implem
     public Map<String, Object> getAllOptions(Object target) {
         Map<String, Object> answer = new CaseInsensitiveMap();
         answer.put("basicPropertyBinding", boolean.class);
+        answer.put("block", boolean.class);
         answer.put("bridgeErrorHandler", boolean.class);
         answer.put("lazyStartProducer", boolean.class);
+        answer.put("timeout", long.class);
         return answer;
     }
 
@@ -44,10 +48,12 @@ public class KameletComponentConfigurer extends PropertyConfigurerSupport implem
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": return target.isBasicPropertyBinding();
+        case "block": return target.isBlock();
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
+        case "timeout": return target.getTimeout();
         default: return null;
         }
     }
diff --git a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
index bb99f63..6869355 100644
--- a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
+++ b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
@@ -21,15 +21,19 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "block": target.setBlock(property(camelContext, boolean.class, value)); return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
         case "exceptionhandler":
         case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
         case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
+        case "kameletproperties":
+        case "kameletProperties": target.setKameletProperties(property(camelContext, java.util.Map.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
         case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
+        case "timeout": target.setTimeout(property(camelContext, long.class, value)); return true;
         default: return false;
         }
     }
@@ -38,11 +42,14 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
     public Map<String, Object> getAllOptions(Object target) {
         Map<String, Object> answer = new CaseInsensitiveMap();
         answer.put("basicPropertyBinding", boolean.class);
+        answer.put("block", boolean.class);
         answer.put("bridgeErrorHandler", boolean.class);
         answer.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class);
         answer.put("exchangePattern", org.apache.camel.ExchangePattern.class);
+        answer.put("kameletProperties", java.util.Map.class);
         answer.put("lazyStartProducer", boolean.class);
         answer.put("synchronous", boolean.class);
+        answer.put("timeout", long.class);
         return answer;
     }
 
@@ -52,15 +59,19 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "basicpropertybinding":
         case "basicPropertyBinding": return target.isBasicPropertyBinding();
+        case "block": return target.isBlock();
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "exceptionhandler":
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
         case "exchangePattern": return target.getExchangePattern();
+        case "kameletproperties":
+        case "kameletProperties": return target.getKameletProperties();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
         case "synchronous": return target.isSynchronous();
+        case "timeout": return target.getTimeout();
         default: return null;
         }
     }
diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
index 2da7754..a66f1ca 100644
--- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
+++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
@@ -22,7 +22,9 @@
   },
   "componentProperties": {
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by [...]
+    "block": { "kind": "property", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "description": "If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active." },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the r [...]
+    "timeout": { "kind": "property", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "30000", "description": "The timeout value to use if block is enabled." },
     "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" }
   },
   "properties": {
@@ -31,7 +33,10 @@
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "block": { "kind": "parameter", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "description": "If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active." },
+    "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Custom properties for kamelet" },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the  [...]
+    "timeout": { "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "30000", "description": "The timeout value to use if block is enabled." },
     "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
     "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }
   }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 91a2514..1cc92b2 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -17,8 +17,12 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -26,9 +30,16 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.VetoCamelContextStartException;
+import org.apache.camel.model.EndpointRequiredDefinition;
+import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.ProcessorDefinitionHelper;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
+import org.apache.camel.model.RouteTemplateParameterDefinition;
+import org.apache.camel.model.ToDefinition;
 import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.LifecycleStrategySupport;
@@ -36,6 +47,9 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+
 /**
  * The Kamelet Component provides support for materializing routes templates.
  */
@@ -57,6 +71,11 @@ public class KameletComponent extends DefaultComponent {
     }
 
     @Override
+    public Endpoint createEndpoint(String uri) throws Exception {
+        return super.createEndpoint(uri);
+    }
+
+    @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
         final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
@@ -209,14 +228,92 @@ public class KameletComponent extends DefaultComponent {
             LOGGER.debug("Creating route from template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
 
             final ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
-            final String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+            final String id = addRouteFromTemplate(context, endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
             final RouteDefinition def = context.getRouteDefinition(id);
 
             if (!def.isPrepared()) {
-                context.startRouteDefinitions(List.of(def));
+                // when starting the route that was created from the template
+                // then we must provide the route id as local properties to the properties component
+                // as this route id is used internal by kamelets when they are starting
+                PropertiesComponent pc = context.getPropertiesComponent();
+                try {
+                    Properties prop = new Properties();
+                    prop.put("routeId", id);
+                    pc.setLocalProperties(prop);
+                    context.startRouteDefinitions(List.of(def));
+                } finally {
+                    pc.setLocalProperties(null);
+                }
             }
 
             LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
         }
+
+        private static String addRouteFromTemplate(final ModelCamelContext context, final String routeId, final String routeTemplateId, final Map<String, Object> parameters)
+                throws Exception {
+            RouteTemplateDefinition target = null;
+            for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
+                if (routeTemplateId.equals(def.getId())) {
+                    target = def;
+                    break;
+                }
+            }
+            if (target == null) {
+                throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
+            }
+
+            StringJoiner templatesBuilder = new StringJoiner(", ");
+            final Map<String, Object> prop = new HashMap();
+            // include default values first from the template (and validate that we have inputs for all required parameters)
+            if (target.getTemplateParameters() != null) {
+                for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
+                    if (temp.getDefaultValue() != null) {
+                        prop.put(temp.getName(), temp.getDefaultValue());
+                    } else {
+                        // this is a required parameter do we have that as input
+                        if (!parameters.containsKey(temp.getName())) {
+                            templatesBuilder.add(temp.getName());
+                        }
+                    }
+                }
+            }
+            if (templatesBuilder.length() > 0) {
+                throw new IllegalArgumentException(
+                        "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
+                                + templatesBuilder.toString());
+            }
+            // then override with user parameters
+            if (parameters != null) {
+                prop.putAll(parameters);
+            }
+
+            RouteDefinition def = target.asRouteDefinition();
+            // must make deep copy of input
+            def.setInput(null);
+            def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
+            if (routeId != null) {
+                def.setId(routeId);
+            }
+            // must make the source and simk endpoints are unique by appending the route id before we create the route from the template
+            if (def.getInput().getEndpointUri().startsWith("kamelet:source")) {
+                def.getInput().setUri("kamelet:source?routeId=" + routeId);
+            }
+            Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
+            while (it.hasNext()) {
+                ToDefinition to = it.next();
+                if (to.getEndpointUri().startsWith("kamelet:sink")) {
+                    // TODO: must make deep copy
+                    to.setUri("kamelet:sink?routeId=" + routeId);
+                }
+            }
+
+
+            def.setTemplateParameters(prop);
+            context.removeRouteDefinition(def);
+            context.getRouteDefinitions().add(def);
+
+            return def.getId();
+        }
+
     }
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 80d8e10..2d6e883 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -127,6 +127,9 @@ public class KameletEndpoint extends DefaultEndpoint {
         return routeId;
     }
 
+    /**
+     * Custom properties for kamelet
+     */
     public void setKameletProperties(Map<String, Object> kameletProperties) {
         if (kameletProperties != null) {
             this.kameletProperties.clear();