You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 09:20:18 UTC

[camel-k-runtime] 01/03: kamelet source/sink component #490

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 4dede2ab1d61551b675b6bde82dae236511ac07e
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Mon Sep 28 11:56:22 2020 +0200

    kamelet source/sink component #490
---
 components/camel-kamelet/pom.xml                   |   9 +-
 .../apache/camel/component/kamelet/Kamelet.java    |  32 ++---
 .../camel/component/kamelet/KameletComponent.java  | 143 ++++++++++++++-----
 .../camel/component/kamelet/KameletConsumer.java   |  74 ++++++++++
 .../camel/component/kamelet/KameletEndpoint.java   | 157 ++++++++++++---------
 .../camel/component/kamelet/KameletProducer.java   |  70 +++++++++
 .../camel/component/kamelet/KameletBasicTest.java  |   4 +-
 .../component/kamelet/KameletPropertiesTest.java   |   2 +-
 .../camel/component/kamelet/KameletRouteTest.java  |  23 +--
 .../component/kamelet/KameletValidationTest.java   |   2 +-
 .../src/test/resources/log4j2-test.xml             |   2 -
 11 files changed, 361 insertions(+), 157 deletions(-)

diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml
index 21232ac..426c1f7 100644
--- a/components/camel-kamelet/pom.xml
+++ b/components/camel-kamelet/pom.xml
@@ -40,10 +40,6 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-engine</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-direct</artifactId>
-        </dependency>
 
         <!-- ****************************** -->
         <!--                                -->
@@ -78,6 +74,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-direct</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-test-junit5</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index d9ac81b..689c1ed 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -17,24 +17,19 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Predicate;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public final class Kamelet {
     public static final String PROPERTIES_PREFIX = "camel.kamelet.";
     public static final String SCHEME = "kamelet";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(Kamelet.class);
+    public static final String SOURCE_ID = "source";
+    public static final String SINK_ID = "sink";
 
     private Kamelet() {
     }
@@ -43,20 +38,11 @@ public final class Kamelet {
         return item -> item.startsWith(prefix);
     }
 
-    public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception {
-        LOGGER.debug("Creating route from template {}", endpoint.getTemplateId());
-
-        ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
-        String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
-        RouteDefinition def = context.getRouteDefinition(id);
-        if (!def.isPrepared()) {
-            context.startRouteDefinitions(List.of(def));
+    public static String extractTemplateId(CamelContext context, String remaining) {
+        if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
+            return context.resolvePropertyPlaceholders("{{templateId}}");
         }
 
-        LOGGER.debug("Route {} created from template {}", id, endpoint.getTemplateId());
-    }
-
-    public static  String extractTemplateId(CamelContext context, String remaining) {
         String answer = StringHelper.before(remaining, "/");
         if (answer == null) {
             answer = remaining;
@@ -65,7 +51,11 @@ public final class Kamelet {
         return answer;
     }
 
-    public static  String extractRouteId(CamelContext context, String remaining) {
+    public static String extractRouteId(CamelContext context, String remaining) {
+        if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
+            return context.resolvePropertyPlaceholders("{{routeId}}");
+        }
+
         String answer = StringHelper.after(remaining, "/");
         if (answer == null) {
             answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid();
@@ -74,7 +64,7 @@ public final class Kamelet {
         return answer;
     }
 
-    public static  Map<String, Object> extractKameletProperties(CamelContext context, String... elements) {
+    public static Map<String, Object> extractKameletProperties(CamelContext context, String... elements) {
         PropertiesComponent pc = context.getPropertiesComponent();
         Map<String, Object> properties = new HashMap<>();
         String prefix = Kamelet.PROPERTIES_PREFIX;
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 7a97d9c..91a2514 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -19,64 +19,120 @@ package org.apache.camel.component.kamelet;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.VetoCamelContextStartException;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * The Kamelet Component provides support for interacting with <a href="https://knative.dev">Knative</a>.
+ * The Kamelet Component provides support for materializing routes templates.
  */
 @Component(Kamelet.SCHEME)
 public class KameletComponent extends DefaultComponent {
-    private final LifecycleHandler lifecycleHandler;
+    private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
 
-    public KameletComponent() {
-        this(null);
-    }
+    private final Map<String, KameletConsumer> consumers;
+    private final LifecycleHandler lifecycleHandler;
 
-    public KameletComponent(CamelContext context) {
-        super(context);
+    @Metadata(label = "producer", defaultValue = "true")
+    private boolean block = true;
+    @Metadata(label = "producer", defaultValue = "30000")
+    private long timeout = 30000L;
 
+    public KameletComponent() {
         this.lifecycleHandler = new LifecycleHandler();
+        this.consumers = new ConcurrentHashMap<>();
     }
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
         final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
+        final String newUri = "kamelet:" + templateId + "/" + routeId;
+
+        final KameletEndpoint endpoint;
 
-        //
-        // The properties for the kamelets are determined by global properties
-        // and local endpoint parameters,
-        //
-        // Global parameters are loaded in the following order:
-        //
-        //   camel.kamelet." + templateId
-        //   camel.kamelet." + templateId + "." routeId
-        //
-        Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
-        kameletProperties.putAll(parameters);
-        kameletProperties.put("templateId", templateId);
-        kameletProperties.put("routeId", routeId);
-
-        // Remaining parameter should be related to the route and to avoid the
-        // parameters validation to fail, we need to clear the parameters map.
-        parameters.clear();
-
-        KameletEndpoint endpoint = new KameletEndpoint(uri, this, templateId, routeId, kameletProperties);
-
-        // No parameters are expected here.
-        setProperties(endpoint, parameters);
+        if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) {
+            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) {
+                @Override
+                protected void doInit() throws Exception {
+                    super.doInit();
+                    lifecycleHandler.track(this);
+                }
+            };
+
+            // forward component properties
+            endpoint.setBlock(block);
+            endpoint.setTimeout(timeout);
+
+            // set endpoint specific properties
+            setProperties(endpoint, parameters);
+
+            //
+            // The properties for the kamelets are determined by global properties
+            // and local endpoint parameters,
+            //
+            // Global parameters are loaded in the following order:
+            //
+            //   camel.kamelet." + templateId
+            //   camel.kamelet." + templateId + "." routeId
+            //
+            Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
+            kameletProperties.putAll(parameters);
+            kameletProperties.put("templateId", templateId);
+            kameletProperties.put("routeId", routeId);
+
+            // set kamelet specific properties
+            endpoint.setKameletProperties(kameletProperties);
+        } else {
+            endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers);
+
+            // forward component properties
+            endpoint.setBlock(block);
+            endpoint.setTimeout(timeout);
+
+            // set endpoint specific properties
+            setProperties(endpoint, parameters);
+        }
 
         return endpoint;
     }
 
+    public boolean isBlock() {
+        return block;
+    }
+
+    /**
+     * If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block
+     * and wait for the consumer to become active.
+     */
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * The timeout value to use if block is enabled.
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
     @Override
     protected void doInit() throws Exception {
         getCamelContext().addLifecycleStrategy(lifecycleHandler);
@@ -91,11 +147,11 @@ public class KameletComponent extends DefaultComponent {
     @Override
     protected void doStop() throws Exception {
         getCamelContext().getLifecycleStrategies().remove(lifecycleHandler);
-        super.doStop();
-    }
 
-    void onEndpointAdd(KameletEndpoint endpoint) {
-        lifecycleHandler.track(endpoint);
+        ServiceHelper.stopService(consumers.values());
+        consumers.clear();
+
+        super.doStop();
     }
 
     /*
@@ -118,9 +174,11 @@ public class KameletComponent extends DefaultComponent {
         @Override
         public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException {
             if (!this.initialized.compareAndExchange(false, true)) {
+                ModelCamelContext mcc = context.adapt(ModelCamelContext.class);
+
                 for (KameletEndpoint endpoint : endpoints) {
                     try {
-                        Kamelet.createRouteForEndpoint(endpoint);
+                        createRouteForEndpoint(endpoint);
                     } catch (Exception e) {
                         throw new VetoCamelContextStartException("Failure creating route from template: " + endpoint.getTemplateId(), e, context);
                     }
@@ -137,13 +195,28 @@ public class KameletComponent extends DefaultComponent {
         public void track(KameletEndpoint endpoint) {
             if (this.initialized.get()) {
                 try {
-                    Kamelet.createRouteForEndpoint(endpoint);
+                    createRouteForEndpoint(endpoint);
                 } catch (Exception e) {
                     throw RuntimeCamelException.wrapRuntimeException(e);
                 }
             } else {
+                LOGGER.debug("Tracking route template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
                 this.endpoints.add(endpoint);
             }
         }
+
+        public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception {
+            LOGGER.debug("Creating route from template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
+
+            final ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
+            final String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+            final RouteDefinition def = context.getRouteDefinition(id);
+
+            if (!def.isPrepared()) {
+                context.startRouteDefinitions(List.of(def));
+            }
+
+            LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
+        }
     }
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
new file mode 100644
index 0000000..c99d56c
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.Suspendable;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.support.DefaultConsumer;
+
+final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
+    public KameletConsumer(KameletEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    public KameletEndpoint getEndpoint() {
+        return (KameletEndpoint)super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        getEndpoint().addConsumer(this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        getEndpoint().removeConsumer(this);
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        getEndpoint().removeConsumer(this);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        getEndpoint().addConsumer(this);
+    }
+
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // deny stopping on shutdown as we want kamelet consumers to run in
+        // case some other queues depend on this consumer to run, so it can
+        // complete its exchanges
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        // return 0 as we do not have an internal memory queue with a variable
+        // size of inflight messages.
+        return 0;
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // noop
+    }
+}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 3209c6d..80d8e10 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -17,23 +17,21 @@
 package org.apache.camel.component.kamelet;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProducer;
 import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
-import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @UriEndpoint(
     firstVersion = "3.5.0",
@@ -43,34 +41,67 @@ import org.apache.camel.util.ObjectHelper;
     lenientProperties = true,
     label = "camel-k")
 public class KameletEndpoint extends DefaultEndpoint {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class);
+
     @Metadata(required = true)
     @UriPath(description = "The Route Template ID")
     private final String templateId;
-
     @Metadata(required = false)
     @UriPath(description = "The Route ID", defaultValueNote = "The ID will be auto-generated if not provided")
     private final String routeId;
 
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean block = true;
+    @UriParam(label = "producer", defaultValue = "30000")
+    private long timeout = 30000L;
+    @UriParam(label = "producer", defaultValue = "true")
+
     private final Map<String, Object> kameletProperties;
-    private final String kameletUri;
+    private final Map<String, KameletConsumer> consumers;
+    private final String key;
 
     public KameletEndpoint(
             String uri,
             KameletComponent component,
             String templateId,
             String routeId,
-            Map<String, Object> kameletProperties) {
+            Map<String, KameletConsumer> consumers) {
 
         super(uri, component);
 
         ObjectHelper.notNull(templateId, "template id");
         ObjectHelper.notNull(routeId, "route id");
-        ObjectHelper.notNull(kameletProperties, "kamelet properties");
 
         this.templateId = templateId;
         this.routeId = routeId;
-        this.kameletProperties = Collections.unmodifiableMap(kameletProperties);
-        this.kameletUri = "direct:" + routeId;
+        this.key = templateId + "/" + routeId;
+        this.kameletProperties = new HashMap<>();
+        this.consumers = consumers;
+    }
+
+    public boolean isBlock() {
+        return block;
+    }
+
+    /**
+     * If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block
+     * and wait for the consumer to become active.
+     */
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * The timeout value to use if block is enabled.
+     *
+     * @param timeout the timeout value
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
     }
 
     @Override
@@ -83,6 +114,11 @@ public class KameletEndpoint extends DefaultEndpoint {
         return true;
     }
 
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
     public String getTemplateId() {
         return templateId;
     }
@@ -91,88 +127,71 @@ public class KameletEndpoint extends DefaultEndpoint {
         return routeId;
     }
 
+    public void setKameletProperties(Map<String, Object> kameletProperties) {
+        if (kameletProperties != null) {
+            this.kameletProperties.clear();
+            this.kameletProperties.putAll(kameletProperties);
+        }
+    }
+
     public Map<String, Object> getKameletProperties() {
-        return kameletProperties;
+        return Collections.unmodifiableMap(kameletProperties);
     }
 
     @Override
     public Producer createProducer() throws Exception {
-        return new KameletProducer();
+        return new KameletProducer(this);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer answer = new KemeletConsumer(processor);
+        Consumer answer = new KameletConsumer(this, processor);
         configureConsumer(answer);
         return answer;
     }
 
-    @Override
-    protected void doInit() throws Exception {
-        super.doInit();
-        getComponent().onEndpointAdd(this);
-    }
-
     // *********************************
     //
     // Helpers
     //
     // *********************************
 
-    private class KemeletConsumer extends DefaultConsumer {
-        private volatile Endpoint endpoint;
-        private volatile Consumer consumer;
-
-        public KemeletConsumer(Processor processor) {
-            super(KameletEndpoint.this, processor);
-        }
-
-        @Override
-        protected void doStart() throws Exception {
-            endpoint = getCamelContext().getEndpoint(kameletUri);
-            consumer = endpoint.createConsumer(getProcessor());
-
-            ServiceHelper.startService(endpoint, consumer);
-            super.doStart();
-        }
-
-        @Override
-        protected void doStop() throws Exception {
-            ServiceHelper.stopService(consumer, endpoint);
-            super.doStop();
+    void addConsumer(KameletConsumer consumer) {
+        synchronized (consumers) {
+            if (consumers.putIfAbsent(key, consumer) != null) {
+                throw new IllegalArgumentException(
+                    "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
+            }
+            consumers.notifyAll();
         }
     }
 
-    private class KameletProducer extends DefaultAsyncProducer {
-        private volatile Endpoint endpoint;
-        private volatile AsyncProducer producer;
-
-        public KameletProducer() {
-            super(KameletEndpoint.this);
+    void removeConsumer(KameletConsumer consumer) {
+        synchronized (consumers) {
+            consumers.remove(key, consumer);
+            consumers.notifyAll();
         }
+    }
 
-        @Override
-        public boolean process(Exchange exchange, AsyncCallback callback) {
-            if (producer != null) {
-                return producer.process(exchange, callback);
-            } else {
-                callback.done(true);
-                return true;
+    KameletConsumer getConsumer() throws InterruptedException {
+        synchronized (consumers) {
+            KameletConsumer answer = consumers.get(key);
+            if (answer == null && block) {
+                StopWatch watch = new StopWatch();
+                for (; ; ) {
+                    answer =consumers.get(key);
+                    if (answer != null) {
+                        break;
+                    }
+                    long rem = timeout - watch.taken();
+                    if (rem <= 0) {
+                        break;
+                    }
+                    consumers.wait(rem);
+                }
             }
-        }
-
-        @Override
-        protected void doStart() throws Exception {
-            endpoint = getCamelContext().getEndpoint(kameletUri);
-            producer = endpoint.createAsyncProducer();
-            ServiceHelper.startService(endpoint, producer);
-            super.doStart();
-        }
 
-        @Override
-        protected void doStop() throws Exception {
-            ServiceHelper.stopService(producer, endpoint);
-            super.doStop();
+            return answer;
         }
     }
 }
\ No newline at end of file
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
new file mode 100644
index 0000000..9e6d86d
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+
+final class KameletProducer extends DefaultAsyncProducer {
+    public KameletProducer(KameletEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public KameletEndpoint getEndpoint() {
+        return (KameletEndpoint)super.getEndpoint();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        final KameletConsumer consumer = getEndpoint().getConsumer();
+
+        if (consumer != null) {
+            consumer.getProcessor().process(exchange);
+        } else {
+            exchange.setException(
+                new CamelExchangeException(
+                    "No consumers available on endpoint: " + getEndpoint(), exchange)
+            );
+        }
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            final KameletConsumer consumer = getEndpoint().getConsumer();;
+
+            if (consumer != null) {
+                return consumer.getAsyncProcessor().process(exchange, callback);
+            } else {
+                exchange.setException(
+                    new CamelExchangeException(
+                        "No consumers available on endpoint: " + getEndpoint(), exchange)
+                );
+
+                callback.done(true);
+                return true;
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+}
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
index e023e07..5826f21 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
@@ -78,13 +78,13 @@ public class KameletBasicTest extends CamelTestSupport {
             public void configure() throws Exception {
                 routeTemplate("setBody")
                     .templateParameter("bodyValue")
-                    .from("direct:{{routeId}}")
+                    .from("kamelet:source")
                     .setBody().constant("{{bodyValue}}");
 
                 routeTemplate("tick")
                     .from("timer:{{routeId}}?repeatCount=1&delay=-1")
                     .setBody().exchangeProperty(Exchange.TIMER_COUNTER)
-                    .to("direct:{{routeId}}");
+                    .to("kamelet:sink");
 
                 from("direct:templateEmbedded")
                     .toF("kamelet:setBody/embedded?bodyValue=embedded");
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
index d33a15b..67d6ff5 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
@@ -78,7 +78,7 @@ public class KameletPropertiesTest extends CamelTestSupport {
                 // template
                 routeTemplate("setBody")
                     .templateParameter("bodyValue")
-                    .from("direct:{{routeId}}")
+                    .from("kamelet:source")
                     .setBody().constant("{{bodyValue}}");
             }
         };
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
index 8aed313..7e8c345 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
@@ -18,16 +18,13 @@ package org.apache.camel.component.kamelet;
 
 import java.util.UUID;
 
-import org.apache.camel.CamelExecutionException;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.direct.DirectConsumerNotAvailableException;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.http.annotation.Obsolete;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 public class KameletRouteTest extends CamelTestSupport {
     @Test
@@ -48,15 +45,6 @@ public class KameletRouteTest extends CamelTestSupport {
         ).isEqualTo("b-a-" + body);
     }
 
-    @Test
-    public void testFailure()  {
-        String body = UUID.randomUUID().toString();
-
-        assertThatExceptionOfType(CamelExecutionException.class)
-            .isThrownBy(() -> fluentTemplate.toF("direct:fail").withBody(body).request(String.class))
-            .withCauseExactlyInstanceOf(DirectConsumerNotAvailableException.class);
-    }
-
     // **********************************************
     //
     // test set-up
@@ -70,12 +58,7 @@ public class KameletRouteTest extends CamelTestSupport {
             public void configure() throws Exception {
                 routeTemplate("echo")
                     .templateParameter("prefix")
-                    .from("direct:{{routeId}}")
-                    .setBody().simple("{{prefix}}-${body}");
-
-                routeTemplate("echo-fail")
-                    .templateParameter("prefix")
-                    .from("direct:#property:routeId")
+                    .from("kamelet:source")
                     .setBody().simple("{{prefix}}-${body}");
 
                 from("direct:single")
@@ -86,10 +69,6 @@ public class KameletRouteTest extends CamelTestSupport {
                     .to("kamelet:echo/1?prefix=a")
                     .to("kamelet:echo/2?prefix=b")
                     .log("${body}");
-
-                from("direct:fail")
-                    .to("kamelet:echo-fail?prefix=a")
-                    .log("${body}");
             }
         };
     }
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
index e16cf10..f35c8e3 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
@@ -34,7 +34,7 @@ public class KameletValidationTest {
             public void configure() throws Exception {
                 routeTemplate("setBody")
                     .templateParameter("bodyValue")
-                    .from("direct:{{routeId}}")
+                    .from("kamelet:source")
                     .setBody().constant("{{bodyValue}}");
 
                 from("direct:start")
diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml
index d5df1ad..8ce15f1 100644
--- a/components/camel-kamelet/src/test/resources/log4j2-test.xml
+++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml
@@ -32,9 +32,7 @@
     <Logger name="org.apache.camel.component.kamelet" level="TRACE"/>
 
     <Root level="INFO">
-      <!--
       <AppenderRef ref="STDOUT"/>
-      -->
       <AppenderRef ref="FILE"/>
     </Root>
   </Loggers>