You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/07 13:51:52 UTC

[camel] 01/08: CAMEL-17261: camel-yaml-dsl - Add support for loading Camel K KameletBinding file. WIP.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 30b2fbc7b761231bf6ed5f2e38156d49e00a96b9
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Dec 7 07:03:17 2021 +0100

    CAMEL-17261: camel-yaml-dsl - Add support for loading Camel K KameletBinding file. WIP.
---
 .../dsl/yaml/deserializers/ModelDeserializers.java |  37 ---
 .../deserializers/ModelDeserializersResolver.java  |   3 -
 dsl/camel-yaml-dsl/camel-yaml-dsl/pom.xml          |  10 +
 .../src/generated/resources/camel-yaml-dsl.json    |   9 -
 .../camel/dsl/yaml/YamlRoutesBuilderLoader.java    | 269 ++++++++++++-----
 .../dsl/yaml/YamlRoutesBuilderLoaderSupport.java   |  37 +++
 .../camel/dsl/yaml/KameletBindingLoaderTest.groovy | 335 +++++++++++++++++++++
 .../camel/dsl/yaml/support/YamlTestSupport.groovy  |  22 ++
 .../test/resources/kamelets/log-sink.kamelet.yaml  |  61 ++++
 .../resources/kamelets/prefix-action.kamelet.yaml  |  45 +++
 .../resources/kamelets/timer-source.kamelet.yaml   |  69 +++++
 11 files changed, 776 insertions(+), 121 deletions(-)

diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 1804ce1..a6eb0f8 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -69,7 +69,6 @@ import org.apache.camel.model.Resilience4jConfigurationDefinition;
 import org.apache.camel.model.RestContextRefDefinition;
 import org.apache.camel.model.RollbackDefinition;
 import org.apache.camel.model.RouteBuilderDefinition;
-import org.apache.camel.model.RouteConfigurationContextRefDefinition;
 import org.apache.camel.model.RouteContextRefDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RouteTemplateParameterDefinition;
@@ -12919,42 +12918,6 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
     }
 
     @YamlType(
-            types = org.apache.camel.model.RouteConfigurationContextRefDefinition.class,
-            order = org.apache.camel.dsl.yaml.common.YamlDeserializerResolver.ORDER_LOWEST - 1,
-            nodes = {
-                    "route-configuration-context-ref",
-                    "routeConfigurationContextRef"
-            },
-            properties = @YamlProperty(name = "ref", type = "string", required = true)
-    )
-    public static class RouteConfigurationContextRefDefinitionDeserializer extends YamlDeserializerBase<RouteConfigurationContextRefDefinition> {
-        public RouteConfigurationContextRefDefinitionDeserializer() {
-            super(RouteConfigurationContextRefDefinition.class);
-        }
-
-        @Override
-        protected RouteConfigurationContextRefDefinition newInstance() {
-            return new RouteConfigurationContextRefDefinition();
-        }
-
-        @Override
-        protected boolean setProperty(RouteConfigurationContextRefDefinition target,
-                String propertyKey, String propertyName, Node node) {
-            switch(propertyKey) {
-                case "ref": {
-                    String val = asText(node);
-                    target.setRef(val);
-                    break;
-                }
-                default: {
-                    return false;
-                }
-            }
-            return true;
-        }
-    }
-
-    @YamlType(
             types = org.apache.camel.model.RouteContextRefDefinition.class,
             order = org.apache.camel.dsl.yaml.common.YamlDeserializerResolver.ORDER_LOWEST - 1,
             nodes = {
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
index f0629b1..f5837a5 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
@@ -363,9 +363,6 @@ public final class ModelDeserializersResolver implements YamlDeserializerResolve
             case "route-builder": return new ModelDeserializers.RouteBuilderDefinitionDeserializer();
             case "routeBuilder": return new ModelDeserializers.RouteBuilderDefinitionDeserializer();
             case "org.apache.camel.model.RouteBuilderDefinition": return new ModelDeserializers.RouteBuilderDefinitionDeserializer();
-            case "route-configuration-context-ref": return new ModelDeserializers.RouteConfigurationContextRefDefinitionDeserializer();
-            case "routeConfigurationContextRef": return new ModelDeserializers.RouteConfigurationContextRefDefinitionDeserializer();
-            case "org.apache.camel.model.RouteConfigurationContextRefDefinition": return new ModelDeserializers.RouteConfigurationContextRefDefinitionDeserializer();
             case "route-context-ref": return new ModelDeserializers.RouteContextRefDefinitionDeserializer();
             case "routeContextRef": return new ModelDeserializers.RouteContextRefDefinitionDeserializer();
             case "org.apache.camel.model.RouteContextRefDefinition": return new ModelDeserializers.RouteContextRefDefinitionDeserializer();
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/pom.xml b/dsl/camel-yaml-dsl/camel-yaml-dsl/pom.xml
index 787cfb2..7170e1d4 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/pom.xml
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/pom.xml
@@ -116,6 +116,16 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-timer</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-stub</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-log</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
index 62e992b..89dfbf3 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
@@ -2131,15 +2131,6 @@
         } ],
         "required" : [ "ref" ]
       },
-      "org.apache.camel.model.RouteConfigurationContextRefDefinition" : {
-        "type" : "object",
-        "properties" : {
-          "ref" : {
-            "type" : "string"
-          }
-        },
-        "required" : [ "ref" ]
-      },
       "org.apache.camel.model.RouteConfigurationDefinition" : {
         "oneOf" : [ {
           "type" : "string"
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
index 30cd3fa..fa7ca59 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
@@ -16,11 +16,12 @@
  */
 package org.apache.camel.dsl.yaml;
 
-import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.builder.DeadLetterChannelBuilder;
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.builder.RouteConfigurationBuilder;
@@ -35,11 +36,15 @@ import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.model.rest.VerbDefinition;
 import org.apache.camel.spi.CamelContextCustomizer;
 import org.apache.camel.spi.annotations.RoutesLoader;
+import org.apache.camel.util.URISupport;
 import org.snakeyaml.engine.v2.nodes.MappingNode;
 import org.snakeyaml.engine.v2.nodes.Node;
 import org.snakeyaml.engine.v2.nodes.NodeTuple;
 import org.snakeyaml.engine.v2.nodes.NodeType;
+import org.snakeyaml.engine.v2.nodes.SequenceNode;
 
+import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asMap;
+import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asMappingNode;
 import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asSequenceNode;
 import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asText;
 import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.nodeAt;
@@ -47,8 +52,15 @@ import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.nodeAt;
 @ManagedResource(description = "Managed YAML RoutesBuilderLoader")
 @RoutesLoader(YamlRoutesBuilderLoader.EXTENSION)
 public class YamlRoutesBuilderLoader extends YamlRoutesBuilderLoaderSupport {
+
     public static final String EXTENSION = "yaml";
 
+    // API versions for Camel-K Integration and Kamelet Binding
+    // we are lenient so lets just assume we can work with any of the v1 even if they evolve
+    private static final String INTEGRATION_VERSION = "camel.apache.org/v1";
+    private static final String BINDING_VERSION = "camel.apache.org/v1";
+    private static final String STRIMZI_VERSION = "kafka.strimzi.io/v1";
+
     public YamlRoutesBuilderLoader() {
         super(EXTENSION);
     }
@@ -57,103 +69,216 @@ public class YamlRoutesBuilderLoader extends YamlRoutesBuilderLoaderSupport {
         return new RouteConfigurationBuilder() {
             @Override
             public void configure() throws Exception {
-                Node target = preConfigureNode(root);
-
-                for (Node node : asSequenceNode(target).getValue()) {
-                    Object item = getDeserializationContext().mandatoryResolve(node).construct(node);
-
-                    if (item instanceof OutputAwareFromDefinition) {
-                        RouteDefinition route = new RouteDefinition();
-                        route.setInput(((OutputAwareFromDefinition) item).getDelegate());
-                        route.setOutputs(((OutputAwareFromDefinition) item).getOutputs());
-
-                        CamelContextAware.trySetCamelContext(getRouteCollection(), getCamelContext());
-                        getRouteCollection().route(route);
-                    } else if (item instanceof RouteDefinition) {
-                        CamelContextAware.trySetCamelContext(getRouteCollection(), getCamelContext());
-                        getRouteCollection().route((RouteDefinition) item);
-                    } else if (item instanceof CamelContextCustomizer) {
-                        ((CamelContextCustomizer) item).configure(getCamelContext());
-                    } else if (item instanceof OnExceptionDefinition) {
-                        if (!getRouteCollection().getRoutes().isEmpty()) {
-                            throw new IllegalArgumentException(
-                                    "onException must be defined before any routes in the RouteBuilder");
-                        }
-                        CamelContextAware.trySetCamelContext(getRouteCollection(), getCamelContext());
-                        getRouteCollection().getOnExceptions().add((OnExceptionDefinition) item);
-                    } else if (item instanceof ErrorHandlerBuilder) {
-                        if (!getRouteCollection().getRoutes().isEmpty()) {
-                            throw new IllegalArgumentException(
-                                    "errorHandler must be defined before any routes in the RouteBuilder");
-                        }
-                        errorHandler((ErrorHandlerBuilder) item);
-                    } else if (item instanceof RouteTemplateDefinition) {
-                        CamelContextAware.trySetCamelContext(getRouteTemplateCollection(), getCamelContext());
-                        getRouteTemplateCollection().routeTemplate((RouteTemplateDefinition) item);
-                    } else if (item instanceof RestDefinition) {
-                        RestDefinition definition = (RestDefinition) item;
-                        for (VerbDefinition verb : definition.getVerbs()) {
-                            verb.setRest(definition);
-                        }
-                        CamelContextAware.trySetCamelContext(getRestCollection(), getCamelContext());
-                        getRestCollection().rest(definition);
-                    } else if (item instanceof RestConfigurationDefinition) {
-                        ((RestConfigurationDefinition) item).asRestConfiguration(
-                                getCamelContext(),
-                                getCamelContext().getRestConfiguration());
+                Object target = preConfigureNode(root);
+                if (target == null) {
+                    return;
+                }
+
+                if (target instanceof Node) {
+                    SequenceNode seq = asSequenceNode((Node) target);
+                    for (Node node : seq.getValue()) {
+                        Object item = getDeserializationContext().mandatoryResolve(node).construct(node);
+                        doConfigure(item);
+                    }
+                } else {
+                    doConfigure(target);
+                }
+            }
+
+            private void doConfigure(Object item) throws Exception {
+                if (item instanceof OutputAwareFromDefinition) {
+                    RouteDefinition route = new RouteDefinition();
+                    route.setInput(((OutputAwareFromDefinition) item).getDelegate());
+                    route.setOutputs(((OutputAwareFromDefinition) item).getOutputs());
+
+                    CamelContextAware.trySetCamelContext(getRouteCollection(), getCamelContext());
+                    getRouteCollection().route(route);
+                } else if (item instanceof RouteDefinition) {
+                    CamelContextAware.trySetCamelContext(getRouteCollection(), getCamelContext());
+                    getRouteCollection().route((RouteDefinition) item);
+                } else if (item instanceof CamelContextCustomizer) {
+                    ((CamelContextCustomizer) item).configure(getCamelContext());
+                } else if (item instanceof OnExceptionDefinition) {
+                    if (!getRouteCollection().getRoutes().isEmpty()) {
+                        throw new IllegalArgumentException(
+                                "onException must be defined before any routes in the RouteBuilder");
+                    }
+                    CamelContextAware.trySetCamelContext(getRouteCollection(), getCamelContext());
+                    getRouteCollection().getOnExceptions().add((OnExceptionDefinition) item);
+                } else if (item instanceof ErrorHandlerBuilder) {
+                    if (!getRouteCollection().getRoutes().isEmpty()) {
+                        throw new IllegalArgumentException(
+                                "errorHandler must be defined before any routes in the RouteBuilder");
+                    }
+                    errorHandler((ErrorHandlerBuilder) item);
+                } else if (item instanceof RouteTemplateDefinition) {
+                    CamelContextAware.trySetCamelContext(getRouteTemplateCollection(), getCamelContext());
+                    getRouteTemplateCollection().routeTemplate((RouteTemplateDefinition) item);
+                } else if (item instanceof RestDefinition) {
+                    RestDefinition definition = (RestDefinition) item;
+                    for (VerbDefinition verb : definition.getVerbs()) {
+                        verb.setRest(definition);
                     }
+                    CamelContextAware.trySetCamelContext(getRestCollection(), getCamelContext());
+                    getRestCollection().rest(definition);
+                } else if (item instanceof RestConfigurationDefinition) {
+                    ((RestConfigurationDefinition) item).asRestConfiguration(
+                            getCamelContext(),
+                            getCamelContext().getRestConfiguration());
                 }
             }
 
             @Override
             public void configuration() throws Exception {
-                Node target = preConfigureNode(root);
+                Object target = preConfigureNode(root);
+                if (target == null) {
+                    return;
+                }
 
-                for (Node node : asSequenceNode(target).getValue()) {
-                    Object item = getDeserializationContext().mandatoryResolve(node).construct(node);
-                    if (item instanceof RouteConfigurationDefinition) {
-                        CamelContextAware.trySetCamelContext(getRouteConfigurationCollection(), getCamelContext());
-                        getRouteConfigurationCollection().routeConfiguration((RouteConfigurationDefinition) item);
+                if (target instanceof Node) {
+                    SequenceNode seq = asSequenceNode((Node) target);
+                    for (Node node : seq.getValue()) {
+                        Object item = getDeserializationContext().mandatoryResolve(node).construct(node);
+                        doConfiguration(item);
                     }
+                } else {
+                    doConfiguration(target);
+                }
+            }
+
+            private void doConfiguration(Object item) {
+                if (item instanceof RouteConfigurationDefinition) {
+                    CamelContextAware.trySetCamelContext(getRouteConfigurationCollection(), getCamelContext());
+                    getRouteConfigurationCollection().routeConfiguration((RouteConfigurationDefinition) item);
                 }
             }
         };
     }
 
-    private static Node preConfigureNode(Node root) {
-        Node target = root;
+    private Object preConfigureNode(Node root) throws Exception {
+        Object target = root;
 
-        // check if the yaml is a camel-k yaml with embedded routes (called flow(s))
+        // check if the yaml is a camel-k yaml with embedded binding/routes (called flow(s))
         if (Objects.equals(root.getNodeType(), NodeType.MAPPING)) {
             final MappingNode mn = YamlDeserializerSupport.asMappingNode(root);
-            boolean camelk = anyTupleMatches(mn.getValue(), "apiVersion", "camel.apache.org/v1") &&
+            // camel-k: integration
+            boolean integration = anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(INTEGRATION_VERSION)) &&
                     anyTupleMatches(mn.getValue(), "kind", "Integration");
-            if (camelk) {
-                Node routes = nodeAt(root, "/spec/flows");
-                if (routes == null) {
-                    routes = nodeAt(root, "/spec/flow");
-                }
-                if (routes != null) {
-                    target = routes;
-                }
+            // camel-k: kamelet binding are still at v1alpha1
+            boolean binding = anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(BINDING_VERSION)) &&
+                    anyTupleMatches(mn.getValue(), "kind", "KameletBinding");
+            if (integration) {
+                target = preConfigureIntegration(root, target);
+            } else if (binding) {
+                target = preConfigureKameletBinding(root, target);
             }
         }
 
         return target;
     }
 
-    private static boolean anyTupleMatches(List<NodeTuple> list, String aKey, String aValue) {
-        for (NodeTuple tuple : list) {
-            final String key = asText(tuple.getKeyNode());
-            final Node val = tuple.getValueNode();
-            if (Objects.equals(aKey, key) && NodeType.SCALAR.equals(val.getNodeType())) {
-                String value = asText(tuple.getValueNode());
-                if (Objects.equals(aValue, value)) {
-                    return true;
+    /**
+     * Camel K Integration file
+     */
+    private Object preConfigureIntegration(Node root, Object target) {
+        Node routes = nodeAt(root, "/spec/flows");
+        if (routes == null) {
+            routes = nodeAt(root, "/spec/flow");
+        }
+        if (routes != null) {
+            target = routes;
+        }
+        return target;
+    }
+
+    /**
+     * Camel K Kamelet Binding file
+     */
+    private Object preConfigureKameletBinding(Node root, Object target) throws Exception {
+        final RouteDefinition route = new RouteDefinition();
+        String routeId = asText(nodeAt(root, "/metadata/name"));
+        if (routeId != null) {
+            route.routeId(routeId);
+        }
+
+        // kamelet binding is a bit more complex, so grab the source and sink
+        // and map those to Camel route definitions
+        MappingNode source = asMappingNode(nodeAt(root, "/spec/source"));
+        MappingNode sink = asMappingNode(nodeAt(root, "/spec/sink"));
+        if (source != null && sink != null) {
+            // source at the beginning (mandatory)
+            String from = extractCamelEndpointUri(source);
+            route.from(from);
+
+            // steps in the middle (optional)
+            Node steps = nodeAt(root, "/spec/steps");
+            if (steps != null) {
+                SequenceNode sn = asSequenceNode(steps);
+                for (Node node : sn.getValue()) {
+                    MappingNode step = asMappingNode(node);
+                    String uri = extractCamelEndpointUri(step);
+                    if (uri != null) {
+                        route.to(uri);
+                    }
                 }
             }
+
+            // sink is at the end (mandatory)
+            String to = extractCamelEndpointUri(sink);
+            route.to(to);
+
+            // is there any error handler?
+            // TODO: set it globally via configuration so its inherited
+            MappingNode errorHandler = asMappingNode(nodeAt(root, "/spec/errorHandler"));
+            if (errorHandler != null) {
+                // there are 5 different error handlers, which one is it
+                NodeTuple nt = errorHandler.getValue().get(0);
+                String ehName = asText(nt.getKeyNode());
+                if ("dead-letter-channel".equals(ehName)) {
+                    DeadLetterChannelBuilder dlcb = new DeadLetterChannelBuilder();
+               }
+            }
+
+            target = route;
+        }
+
+        return target;
+    }
+
+    private String extractCamelEndpointUri(MappingNode node) throws Exception {
+        MappingNode mn = null;
+        Node ref = nodeAt(node, "/ref");
+        if (ref != null) {
+            mn = asMappingNode(ref);
+        }
+
+        // extract uri is different if kamelet or not
+        boolean kamelet = mn != null && anyTupleMatches(mn.getValue(), "kind", "Kamelet");
+        boolean strimzi
+                = !kamelet && mn != null && anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(STRIMZI_VERSION))
+                        && anyTupleMatches(mn.getValue(), "kind", "KafkaTopic");
+        String uri;
+        if (kamelet || strimzi) {
+            uri = extractTupleValue(mn.getValue(), "name");
+        } else {
+            uri = extractTupleValue(node.getValue(), "uri");
+        }
+
+        // properties
+        MappingNode prop = asMappingNode(nodeAt(node, "/properties"));
+        Map<String, Object> params = asMap(prop);
+        if (params != null && !params.isEmpty()) {
+            String query = URISupport.createQueryString(params);
+            uri = uri + "?" + query;
+        }
+
+        if (kamelet) {
+            return "kamelet:" + uri;
+        } else if (strimzi) {
+            return "kafka:" + uri;
+        } else {
+            return uri;
         }
-        return false;
     }
 
 }
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoaderSupport.java b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoaderSupport.java
index ed74dc6..e6821e0 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoaderSupport.java
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoaderSupport.java
@@ -18,8 +18,11 @@ package org.apache.camel.dsl.yaml;
 
 import java.io.FileNotFoundException;
 import java.io.InputStream;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.dsl.support.RouteBuilderLoaderSupport;
@@ -36,10 +39,14 @@ import org.snakeyaml.engine.v2.api.LoadSettings;
 import org.snakeyaml.engine.v2.api.YamlUnicodeReader;
 import org.snakeyaml.engine.v2.composer.Composer;
 import org.snakeyaml.engine.v2.nodes.Node;
+import org.snakeyaml.engine.v2.nodes.NodeTuple;
+import org.snakeyaml.engine.v2.nodes.NodeType;
 import org.snakeyaml.engine.v2.parser.Parser;
 import org.snakeyaml.engine.v2.parser.ParserImpl;
 import org.snakeyaml.engine.v2.scanner.StreamReader;
 
+import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asText;
+
 public abstract class YamlRoutesBuilderLoaderSupport extends RouteBuilderLoaderSupport {
     public static final String DESERIALIZATION_MODE = "CamelYamlDslDeserializationMode";
 
@@ -130,4 +137,34 @@ public abstract class YamlRoutesBuilderLoaderSupport extends RouteBuilderLoaderS
     }
 
     protected abstract RouteBuilder builder(Node node);
+
+    protected boolean anyTupleMatches(List<NodeTuple> list, String aKey, String aValue) {
+        return anyTupleMatches(list, aKey, Predicate.isEqual(aValue));
+    }
+
+    protected boolean anyTupleMatches(List<NodeTuple> list, String aKey, Predicate<String> predicate) {
+        for (NodeTuple tuple : list) {
+            final String key = asText(tuple.getKeyNode());
+            final Node val = tuple.getValueNode();
+            if (Objects.equals(aKey, key) && NodeType.SCALAR.equals(val.getNodeType())) {
+                String value = asText(tuple.getValueNode());
+                if (predicate.test(value)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    protected String extractTupleValue(List<NodeTuple> list, String aKey) {
+        for (NodeTuple tuple : list) {
+            final String key = asText(tuple.getKeyNode());
+            final Node val = tuple.getValueNode();
+            if (Objects.equals(aKey, key) && NodeType.SCALAR.equals(val.getNodeType())) {
+                return asText(tuple.getValueNode());
+            }
+        }
+        return null;
+    }
+
 }
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/KameletBindingLoaderTest.groovy b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/KameletBindingLoaderTest.groovy
new file mode 100644
index 0000000..d80319e
--- /dev/null
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/KameletBindingLoaderTest.groovy
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.yaml
+
+import org.apache.camel.dsl.yaml.support.YamlTestSupport
+import org.apache.camel.model.ToDefinition
+
+class KameletBindingLoaderTest extends YamlTestSupport {
+    @Override
+    def doSetup() {
+        context.start()
+    }
+
+    def "kamelet binding from kamelet to kamelet"() {
+        when:
+            loadBindings('''
+                apiVersion: camel.apache.org/v1alpha1
+                kind: KameletBinding
+                metadata:
+                  name: timer-event-source                  
+                spec:
+                  source:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: timer-source
+                    properties:
+                      message: "Hello world!"
+                  sink:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: log-sink
+            ''')
+        then:
+            context.routeDefinitions.size() == 3
+
+            with (context.routeDefinitions[0]) {
+                routeId == 'timer-event-source'
+                input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+                outputs.size() == 1
+                with (outputs[0], ToDefinition) {
+                    endpointUri == 'kamelet:log-sink'
+                }
+            }
+    }
+
+    def "kamelet binding from uri to kamelet"() {
+        when:
+        loadBindings('''
+                apiVersion: camel.apache.org/v1alpha1
+                kind: KameletBinding
+                metadata:
+                  name: timer-event-source                  
+                spec:
+                  source:
+                    uri: timer:foo
+                  sink:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: log-sink
+            ''')
+        then:
+        context.routeDefinitions.size() == 2
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'timer-event-source'
+            input.endpointUri == 'timer:foo'
+            outputs.size() == 1
+            with (outputs[0], ToDefinition) {
+                endpointUri == 'kamelet:log-sink'
+            }
+        }
+    }
+
+    def "kamelet binding from uri to uri"() {
+        when:
+        loadBindings('''
+                apiVersion: camel.apache.org/v1alpha1
+                kind: KameletBinding
+                metadata:
+                  name: timer-event-source                  
+                spec:
+                  source:
+                    uri: timer:foo
+                  sink:
+                    uri: log:bar
+            ''')
+        then:
+        context.routeDefinitions.size() == 1
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'timer-event-source'
+            input.endpointUri == 'timer:foo'
+            outputs.size() == 1
+            with (outputs[0], ToDefinition) {
+                endpointUri == 'log:bar'
+            }
+        }
+    }
+
+    def "kamelet binding steps"() {
+        when:
+        loadBindings('''
+            apiVersion: camel.apache.org/v1alpha1
+            kind: KameletBinding
+            metadata:
+              name: steps-binding
+            spec:
+              source:
+                ref:
+                  kind: Kamelet
+                  apiVersion: camel.apache.org/v1alpha1
+                  name: timer-source
+                properties:
+                  message: "Camel"
+              steps:
+              - ref:
+                  kind: Kamelet
+                  apiVersion: camel.apache.org/v1alpha1
+                  name: prefix-action
+                properties:
+                  prefix: "Apache"
+              - ref:
+                  kind: Kamelet
+                  apiVersion: camel.apache.org/v1alpha1
+                  name: prefix-action
+                properties:
+                  prefix: "Hello"
+              sink:
+                uri: log:info
+                ''')
+        then:
+        context.routeDefinitions.size() == 4
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'steps-binding'
+            input.endpointUri == 'kamelet:timer-source?message=Camel'
+            outputs.size() == 3
+            with (outputs[2], ToDefinition) {
+                endpointUri == 'log:info'
+            }
+        }
+    }
+
+    def "kamelet binding steps kamelet uri"() {
+        when:
+        loadBindings('''
+            apiVersion: camel.apache.org/v1alpha1
+            kind: KameletBinding
+            metadata:
+              name: steps-binding
+            spec:
+              source:
+                ref:
+                  kind: Kamelet
+                  apiVersion: camel.apache.org/v1alpha1
+                  name: timer-source
+                properties:
+                  message: "Camel"
+              steps:
+              - ref:
+                  kind: Kamelet
+                  apiVersion: camel.apache.org/v1alpha1
+                  name: prefix-action
+                properties:
+                  prefix: "Apache"
+              - uri: mock:dummy
+              sink:
+                uri: log:info
+                ''')
+        then:
+        context.routeDefinitions.size() == 3
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'steps-binding'
+            input.endpointUri == 'kamelet:timer-source?message=Camel'
+            outputs.size() == 3
+            with (outputs[1], ToDefinition) {
+                endpointUri == 'mock:dummy'
+            }
+            with (outputs[2], ToDefinition) {
+                endpointUri == 'log:info'
+            }
+        }
+    }
+
+    def "kamelet binding steps uri uri"() {
+        when:
+        loadBindings('''
+            apiVersion: camel.apache.org/v1alpha1
+            kind: KameletBinding
+            metadata:
+              name: steps-binding
+            spec:
+              source:
+                ref:
+                  kind: Kamelet
+                  apiVersion: camel.apache.org/v1alpha1
+                  name: timer-source
+                properties:
+                  message: "Camel"
+              steps:
+              - uri: mock:dummy
+              - uri: mock:dummy2
+              sink:
+                uri: log:info
+                ''')
+        then:
+        context.routeDefinitions.size() == 2
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'steps-binding'
+            input.endpointUri == 'kamelet:timer-source?message=Camel'
+            outputs.size() == 3
+            with (outputs[0], ToDefinition) {
+                endpointUri == 'mock:dummy'
+            }
+            with (outputs[1], ToDefinition) {
+                endpointUri == 'mock:dummy2'
+            }
+            with (outputs[2], ToDefinition) {
+                endpointUri == 'log:info'
+            }
+        }
+    }
+
+    def "kamelet binding from kamelet to strimzi"() {
+        when:
+
+        // stub kafka for testing as it requires to setup connection to a real kafka broker
+        context.addComponent("kafka", context.getComponent("stub"))
+
+        loadBindings('''
+                apiVersion: camel.apache.org/v1alpha1
+                kind: KameletBinding
+                metadata:
+                  name: timer-event-source                  
+                spec:
+                  source:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: timer-source
+                    properties:
+                      message: "Hello world!"
+                  sink:
+                    ref:
+                      kind: KafkaTopic
+                      apiVersion: kafka.strimzi.io/v1beta2
+                      name: my-topic
+            ''')
+        then:
+        context.routeDefinitions.size() == 2
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'timer-event-source'
+            input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+            outputs.size() == 1
+            with (outputs[0], ToDefinition) {
+                endpointUri == 'kafka:my-topic'
+            }
+        }
+    }
+
+    def "kamelet binding with error handler"() {
+        when:
+
+        // stub kafka for testing as it requires to setup connection to a real kafka broker
+        context.addComponent("kafka", context.getComponent("stub"))
+
+        loadBindings('''
+                apiVersion: camel.apache.org/v1alpha1
+                kind: KameletBinding
+                metadata:
+                  name: timer-event-source                  
+                spec:
+                  source:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: timer-source
+                    properties:
+                      message: "Hello world!"
+                  sink:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1alpha1
+                      name: log-sink
+                  errorHandler:
+                    dead-letter-channel:
+                      endpoint:
+                        ref:
+                          kind: Kamelet
+                          apiVersion: camel.apache.org/v1alpha1
+                          name: error-handler
+                        properties:
+                          log-message: "ERROR!"
+                          kafka-brokers: my-broker
+                          kafka-topic: my-first-test
+                          kafka-service-account-id: scott
+                          kafka-service-account-secret: tiger
+                      parameters:
+                        maximumRedeliveries: 1
+                        redeliveryDelay: 2000    
+                    ''')
+        then:
+        context.routeDefinitions.size() == 2
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'timer-event-source'
+            input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+            outputs.size() == 1
+            with (outputs[0], ToDefinition) {
+                endpointUri == 'kafka:my-topic'
+            }
+        }
+    }
+
+}
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
index efd01cf..e9da693 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
@@ -25,6 +25,7 @@ import org.apache.camel.CamelContext
 import org.apache.camel.FluentProducerTemplate
 import org.apache.camel.builder.RouteBuilder
 import org.apache.camel.component.mock.MockEndpoint
+import org.apache.camel.dsl.yaml.KameletRoutesBuilderLoader
 import org.apache.camel.dsl.yaml.YamlRoutesBuilderLoader
 import org.apache.camel.dsl.yaml.common.YamlDeserializationMode
 import org.apache.camel.impl.DefaultCamelContext
@@ -84,6 +85,17 @@ class YamlTestSupport extends Specification implements HasCamelContext {
         )
     }
 
+    def loadKamelets(Resource... resources) {
+        loadKamelets(resources.toList())
+    }
+
+    def loadKamelets(Collection<Resource> resources) {
+        KameletRoutesBuilderLoader kl = new KameletRoutesBuilderLoader()
+        kl.setCamelContext(context)
+        kl.start()
+        resources.forEach(r -> kl.loadRoutesBuilder(r))
+    }
+
     def loadKamelets(String... resources) {
         int index = 0
 
@@ -94,6 +106,16 @@ class YamlTestSupport extends Specification implements HasCamelContext {
         )
     }
 
+    def loadBindings(String... resources) {
+        int index = 0
+
+        context.routesLoader.loadRoutes(
+            resources.collect {
+                it -> ResourceHelper.fromString("binding-${index++}.yaml", it.stripIndent())
+            }
+        )
+    }
+
     def withMock(
             String uri,
             @DelegatesTo(MockEndpoint) Closure<?> closure) {
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/log-sink.kamelet.yaml b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/log-sink.kamelet.yaml
new file mode 100644
index 0000000..54b37d52
--- /dev/null
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/log-sink.kamelet.yaml
@@ -0,0 +1,61 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: log-sink
+  annotations:
+    camel.apache.org/kamelet.support.level: "Preview"
+    camel.apache.org/catalog.version: "main-SNAPSHOT"
+    camel.apache.org/kamelet.icon: " [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "Logging"
+  labels:
+    camel.apache.org/kamelet.type: "sink"
+spec:
+  definition:
+    title: "Log Sink"
+    description: |-
+      A sink that logs all data that it receives, useful for debugging purposes.
+    type: object
+    properties:
+      showHeaders:
+        title: Show Headers
+        description: Show the headers received
+        type: boolean
+        default: false
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+      showStreams:
+        title: Show Streams
+        description: Show the stream bodies (they may not be available in following steps)
+        type: boolean
+        default: false
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+  dependencies:
+    - "camel:kamelet"
+    - "camel:log"
+  flow:
+    from:
+      uri: "kamelet:source"
+      steps:
+        - to:
+            uri: "log:info"
+            parameters:
+              showHeaders: "{{?showHeaders}}"
+              showStreams: "{{?showStreams}}"
\ No newline at end of file
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/prefix-action.kamelet.yaml b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/prefix-action.kamelet.yaml
new file mode 100644
index 0000000..e8e9b6a
--- /dev/null
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/prefix-action.kamelet.yaml
@@ -0,0 +1,45 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: prefix-action
+  labels:
+    camel.apache.org/kamelet.type: "step"
+spec:
+  definition:
+    title: "Prefix"
+    description: "Adds a prefix to the incoming payload"
+    required:
+      - prefix
+    properties:
+      prefix:
+        title: Prefix
+        description: The prefix to add
+        type: string
+  types:
+    in:
+      mediaType: text/plain
+    out:
+      mediaType: text/plain
+  flow:
+    from:
+      uri: "kamelet:source"
+      steps:
+        - set-body:
+            simple: "{{prefix}} ${body}"
\ No newline at end of file
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/timer-source.kamelet.yaml b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/timer-source.kamelet.yaml
new file mode 100644
index 0000000..96195c1
--- /dev/null
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/timer-source.kamelet.yaml
@@ -0,0 +1,69 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: timer-source
+  annotations:
+    camel.apache.org/kamelet.support.level: "Preview"
+    camel.apache.org/catalog.version: "main-SNAPSHOT"
+    camel.apache.org/kamelet.icon:  [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "Timer"
+  labels:
+    camel.apache.org/kamelet.type: source
+    camel.apache.org/kamelet.verified: "true"
+spec:
+  definition:
+    title: Timer Source
+    description: Produces periodic events with a custom payload.
+    required:
+      - message
+    type: object
+    properties:
+      period:
+        title: Period
+        description: The interval between two events in milliseconds
+        type: integer
+        default: 1000
+      message:
+        title: Message
+        description: The message to generate
+        type: string
+        example: hello world
+      contentType:
+        title: Content Type
+        description: The content type of the message being generated
+        type: string
+        default: text/plain
+  dependencies:
+    - "camel:core"
+    - "camel:timer"
+    - "camel:kamelet"
+  flow:
+    from:
+      uri: timer:tick
+      parameters:
+        period: "{{period}}"
+      steps:
+        - set-body:
+            constant: "{{message}}"
+        - set-header:
+            name: "Content-Type"
+            constant: "{{contentType}}"
+        - to: kamelet:sink
\ No newline at end of file