You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/08/22 10:37:51 UTC

[camel-kafka-connector] branch kamelets updated (8f4fb81 -> 5873cbc)

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a change to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


 discard 8f4fb81  Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog
 discard c1c6dac  Polished tests timeouts.
 discard 7055189  Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
 discard c653fdd  Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources
 discard 8324995  Related to #423 : caonverted source and sink to use camel-kamelets.
     add e33723d  [create-pull-request] automated change
     add a0fd11a  Updated CHANGELOG.md
     add af24edb  Updated CHANGELOG.md
     add 4b515af  Upgrade Camel to version 3.11.1
     add 7b290e3  Regen
     add f443541  Updated CHANGELOG.md
     add c2242ab  [create-pull-request] automated change
     add 15e146a  Updated CHANGELOG.md
     add 4f4cbe1  Updated CHANGELOG.md
     add 85fb758  Updated CHANGELOG.md
     new 251fde7  Related to #423 : caonverted source and sink to use camel-kamelets.
     new 112e13f  Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources
     new 68682b1  Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
     new 7044f32  Polished tests timeouts.
     new 8bbd570  Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog
     new 5873cbc  Related to #423 added autogeneration of kamelets

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (8f4fb81)
            \
             N -- N -- N   refs/heads/kamelets (5873cbc)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.md                                       |   9 +
 .../resources/connectors/camel-aws2-s3-sink.json   |   4 +-
 .../resources/connectors/camel-aws2-s3-source.json |   4 +-
 .../generated/resources/camel-aws2-s3-sink.json    |   4 +-
 .../generated/resources/camel-aws2-s3-source.json  |   4 +-
 .../docs/camel-aws2-s3-kafka-sink-connector.adoc   |   4 +-
 .../docs/camel-aws2-s3-kafka-source-connector.adoc |   4 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  10 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   9 +-
 .../camel-aws2-s3-kafka-sink-connector.adoc        |   4 +-
 .../camel-aws2-s3-kafka-source-connector.adoc      |   4 +-
 parent/pom.xml                                     |   9 +-
 pom.xml                                            |   2 +-
 .../pom.xml                                        |  22 ++
 .../AbstractCamelComponentKafkaConnectorMojo.java  |  20 +-
 .../maven/AbstractCamelKafkaConnectorMojo.java     |  16 +-
 .../AbstractCamelKameletKafkaConnectorMojo.java    |  20 +-
 .../maven/CamelKafkaConnectorCreateMojo.java       |   4 +-
 .../maven/CamelKafkaConnectorDeleteMojo.java       |   4 +-
 ...a => CamelKafkaConnectorKameletCreateMojo.java} |  20 +-
 ...a => CamelKafkaConnectorKameletUpdateMojo.java} | 233 +++++++++------------
 .../maven/CamelKafkaConnectorUpdateMojo.java       |  12 +-
 .../maven/GenerateCamelKafkaConnectorsMojo.java    | 183 +++++++++++++---
 .../kafkaconnector/maven/model/KameletModel.java   |  92 ++++++++
 .../maven/model/KameletPropertyModel.java}         |  74 ++++---
 .../maven/utils/YamlKameletMapper.java             |  90 ++++++++
 ...mel-kafka-connector-fix-dependencies.properties |  10 +-
 ...-connector-kamelet-fix-dependencies.properties} |   0
 ...-kafka-connector-kamelet-template-pom.template} |   4 -
 .../maven/GenerateCamelKafkaConnectorsMojoIT.java  |  30 +--
 .../maven/utils/YamlKameletMapperTests.java        |  81 +++++++
 .../test_generate/pom.xml                          |  11 +
 ...-connector-kamelet-fix-dependencies.properties} |   0
 .../resources/kamelets/aws-s3-source.kamelet.yaml  |  83 ++++++++
 .../resources/kamelets/nodependencies.kamelet.yaml |  80 +++++++
 .../resources/kamelets/noproperties.kamelet.yaml   |  33 +++
 .../resources/template-connecotr-kamelet-pom.xml}  |   4 -
 tooling/pom.xml                                    |   1 +
 38 files changed, 900 insertions(+), 298 deletions(-)
 copy tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchCommon.java => tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelComponentKafkaConnectorMojo.java (60%)
 copy tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchCommon.java => tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKameletKafkaConnectorMojo.java (58%)
 copy tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/{CamelKafkaConnectorCreateMojo.java => CamelKafkaConnectorKameletCreateMojo.java} (89%)
 copy tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/{CamelKafkaConnectorUpdateMojo.java => CamelKafkaConnectorKameletUpdateMojo.java} (82%)
 create mode 100644 tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletModel.java
 copy tooling/{camel-kafka-connector-model/src/main/java/org/apache/camel/kafkaconnector/model/CamelKafkaConnectorOptionModel.java => camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletPropertyModel.java} (55%)
 create mode 100644 tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapper.java
 copy tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/{camel-kafka-connector-fix-dependencies.properties => camel-kafka-connector-kamelet-fix-dependencies.properties} (100%)
 copy tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/{camel-kafka-connector-template-pom.template => camel-kafka-connector-kamelet-template-pom.template} (96%)
 create mode 100644 tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapperTests.java
 copy tooling/camel-kafka-connector-generator-maven-plugin/src/{main/resources/camel-kafka-connector-fix-dependencies.properties => test/resources/camel-kafka-connector-kamelet-fix-dependencies.properties} (100%)
 create mode 100644 tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/aws-s3-source.kamelet.yaml
 create mode 100644 tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/nodependencies.kamelet.yaml
 create mode 100644 tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/noproperties.kamelet.yaml
 copy tooling/camel-kafka-connector-generator-maven-plugin/src/{main/resources/camel-kafka-connector-template-pom.template => test/resources/template-connecotr-kamelet-pom.xml} (96%)

[camel-kafka-connector] 02/06: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 112e13f17bec3dbe9d0814cc76b15c714de4a7d9
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat May 15 07:59:12 2021 +0200

    Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources
---
 .../utils/CamelKafkaConnectMain.java               | 29 +++++++++++-----------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  5 ----
 2 files changed, 14 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 6e7dbdf..036375b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -228,16 +228,15 @@ public class CamelKafkaConnectMain extends SimpleMain {
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
-            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//            //dataformats
-//            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-//            }
-//            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-//            }
+            //dataformats
+            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+            }
+            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+            }
 
             //aggregator
             if (!ObjectHelper.isEmpty(aggregationSize)) {
@@ -310,9 +309,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
                             .templateParameter("fromUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
-                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//                            .templateParameter("marshall", "dummyDataformat")
-//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            .templateParameter("marshall", "dummyDataformat")
+                            .templateParameter("unmarshall", "dummyDataformat")
 
                             //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -327,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
                             .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat);
+                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b1271ac..36ae9e2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -235,9 +235,6 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
-
-
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
                 .forEach(e -> {
@@ -261,8 +258,6 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
-
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("seda"))
                 .forEach(e -> {

[camel-kafka-connector] 01/06: Related to #423 : caonverted source and sink to use camel-kamelets.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 251fde7e0130611f97862100c77e372723020429
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Apr 26 01:38:42 2021 +0200

    Related to #423 : caonverted source and sink to use camel-kamelets.
---
 core/pom.xml                                       |  10 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   5 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   5 +-
 .../utils/CamelKafkaConnectMain.java               | 272 ++++++++++++---------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  10 +-
 .../camel/kafkaconnector/DataFormatTest.java       |  10 +-
 6 files changed, 191 insertions(+), 121 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index f6c014f..3138163 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -53,12 +53,20 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-kafka</artifactId>
+            <artifactId>camel-kamelet</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-languages</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-xml-jaxb</artifactId>
+        </dependency>
 
         <!-- Tools -->
         <dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82c16d2..4e5a201 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -42,6 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelSinkTask extends SinkTask {
+    public static final String KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSink.";
+
     public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key";
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -119,8 +121,9 @@ public class CamelSinkTask extends SinkTask {
                                                 CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
+            actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl)
+            cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink")
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 00ce145..77ce636 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 
 public class CamelSourceTask extends SourceTask {
+    public static final String KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSource.";
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
 
@@ -145,8 +146,9 @@ public class CamelSourceTask extends SourceTask {
                                                 config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
+            actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl)
+            cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl)
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
@@ -171,6 +173,7 @@ public class CamelSourceTask extends SourceTask {
             consumer.start();
 
             cms.start();
+
             LOG.info("CamelSourceTask connector task started");
         } catch (Exception e) {
             throw new ConnectException("Failed to create and start Camel context", e);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index d031b20..6e7dbdf 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -26,10 +26,16 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.DefaultErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.builder.NoErrorHandlerBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.CamelSourceTask;
 import org.apache.camel.main.SimpleMain;
-import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -40,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelKafkaConnectMain extends SimpleMain {
-    public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
     private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
 
     protected volatile ConsumerTemplate consumerTemplate;
@@ -140,67 +145,67 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.aggregationTimeout = aggregationTimeout;
             return this;
         }
-        
+
         public Builder withErrorHandler(String errorHandler) {
             this.errorHandler = errorHandler;
             return this;
         }
-        
+
         public Builder withMaxRedeliveries(int maxRedeliveries) {
             this.maxRedeliveries = maxRedeliveries;
             return this;
         }
-        
+
         public Builder withRedeliveryDelay(long redeliveryDelay) {
             this.redeliveryDelay = redeliveryDelay;
             return this;
         }
-        
+
         public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
             this.idempotencyEnabled = idempotencyEnabled;
             return this;
         }
-        
+
         public Builder withExpressionType(String expressionType) {
             this.expressionType = expressionType;
             return this;
         }
-        
+
         public Builder withExpressionHeader(String expressionHeader) {
             this.expressionHeader = expressionHeader;
             return this;
         }
-        
+
         public Builder withMemoryDimension(int memoryDimension) {
             this.memoryDimension = memoryDimension;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryType(String idempotentRepositoryType) {
             this.idempotentRepositoryType = idempotentRepositoryType;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) {
             this.idempotentRepositoryTopicName = idempotentRepositoryTopicName;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) {
             this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaMaxCacheSize(int idempotentRepositoryKafkaMaxCacheSize) {
             this.idempotentRepositoryKafkaMaxCacheSize = idempotentRepositoryKafkaMaxCacheSize;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaPollDuration(int idempotentRepositoryKafkaPollDuration) {
             this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration;
             return this;
         }
-        
+
         public Builder withHeadersExcludePattern(String headersExcludePattern) {
             this.headersExcludePattern = headersExcludePattern;
             return this;
@@ -214,21 +219,51 @@ public class CamelKafkaConnectMain extends SimpleMain {
             return entry.getKey() + "=" + entry.getValue();
         }
 
-
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
             camelMain.configure().setAutoConfigurationLogSummary(false);
+            //TODO: make it configurable
+            camelMain.configure().setDumpRoutes(true);
 
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
-            List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//            //dataformats
+//            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+//            }
+//            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+//            }
 
-            LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
-            camelMain.setInitialProperties(camelProperties);
-            
-            // Instantianting the idempotent Repository here and inject it in registry to be referenced
+            //aggregator
+            if (!ObjectHelper.isEmpty(aggregationSize)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+            }
+            if (!ObjectHelper.isEmpty(aggregationTimeout)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+            }
+
+            //idempotency
             if (idempotencyEnabled) {
+                switch (expressionType) {
+                    case "body":
+                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        break;
+                    case "header":
+                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        break;
+                    default:
+                        break;
+                }
+                // Instantiating the idempotent Repository here and inject it in registry to be referenced
                 IdempotentRepository idempotentRepo = null;
                 switch (idempotentRepositoryType) {
                     case "memory":
@@ -240,110 +275,123 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     default:
                         break;
                 }
-                camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
+                camelMain.getCamelContext().getRegistry().bind("ckcIdempotentRepository", idempotentRepo);
+            }
+
+            //remove headers
+            if (!ObjectHelper.isEmpty(headersExcludePattern)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+            }
+
+            List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+            LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
+            camelMain.setInitialProperties(camelProperties);
+
+            //error handler
+            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
+            if (errorHandler != null) {
+                switch (errorHandler) {
+                    case "no":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
+                        break;
+                    case "default":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+                        break;
+                    default:
+                        break;
+                }
             }
 
-            //creating the actual route
             camelMain.configure().addRoutesBuilder(new RouteBuilder() {
                 public void configure() {
-                    //from
-                    RouteDefinition rd = from(from);
-                    LOG.info("Creating Camel route from({})", from);
-                    
-                    if (!ObjectHelper.isEmpty(errorHandler)) {
-                        switch (errorHandler) {
-                            case "no":
-                                rd.errorHandler(noErrorHandler());
-                                break;
-                            case "default":
-                                rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
-                                break;
-                            default:
-                                break;
-                        }
+
+                    //creating source template
+                    RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
+                            .templateParameter("fromUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//                            .templateParameter("marshall", "dummyDataformat")
+//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+                            .templateParameter("aggregationSize", "1")
+                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+                            .templateParameter("idempotentExpression", "dummyExpression")
+                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+                            .templateParameter("headersExcludePattern", "(?!)");
+
+
+                    ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
+                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                        rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat);
+                    }
+                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                        rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat);
                     }
 
-                    //dataformats
+                    if (getContext().getRegistry().lookupByName("aggregate") != null) {
+                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
+                        rdInTemplateSource = rdInTemplateSource.aggregate(s)
+                                .constant(true)
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}");
+                    }
+
+                    if (idempotencyEnabled) {
+                        rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                    }
+
+                    rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
+                            .to("kamelet:sink");
+
+                    //creating sink template
+                    RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
+                            .templateParameter("toUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//                            .templateParameter("marshall", "dummyDataformat")
+//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+                            .templateParameter("aggregationSize", "1")
+                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+                            .templateParameter("idempotentExpression", "dummyExpression")
+                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+                            .templateParameter("headersExcludePattern", "(?!)");
+
+
+                    ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        LOG.info(".marshal({})", marshallDataFormat);
-                        rd.marshal(marshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat);
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        LOG.info(".unmarshal({})", unmarshallDataFormat);
-                        rd.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat);
                     }
+
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                        //aggregation
                         AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        if (idempotencyEnabled) {
-                            switch (expressionType) {
-                                case "body":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
-                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
-                                    LOG.info(".to({})", to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                case "header":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
-                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
-                                    LOG.info(".to({})", to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                default:
-                                    break;
-                            }
-                        } else {
-                            LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
-                            LOG.info(".to({})", to);
-                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
-                            } else {
-                                rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to);
-                            }
-                        }
-                    } else {
-                        if (idempotencyEnabled) {
-                            switch (expressionType) {
-                                case "body":
-                                    LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                case "header":
-                                    LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                default:
-                                    break;
-                            }
-                        } else {
-                            //to
-                            LOG.info(".to({})", to);
-                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                rd.toD(to);
-                            } else {
-                                rd.removeHeaders(headersExcludePattern).toD(to);
-                            }
-                        }
+                        rdInTemplateSink = rdInTemplateSink.aggregate(s)
+                                .constant(true)
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}");
                     }
+
+                    if (idempotencyEnabled) {
+                        rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                    }
+
+                    rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
+                            .to("{{toUrl}}");
+
+                    //creating the actual route
+                    from(from).toD(to);
                 }
             });
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 5c99ad0..b1271ac 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CamelSourceTaskTest {
-
     private static final String DIRECT_URI = "direct:start";
     private static final String TOPIC_NAME = "my-topic";
 
@@ -225,7 +224,7 @@ public class CamelSourceTaskTest {
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() {
+    public void testUrlPrecedenceOnComponentProperty() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, "timer:foo?period=10&repeatCount=2");
@@ -236,7 +235,8 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
+
 
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
@@ -261,10 +261,10 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
 
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
-                .filter(e -> e.getEndpointUri().startsWith("direct"))
+                .filter(e -> e.getEndpointUri().startsWith("seda"))
                 .forEach(e -> {
                     assertTrue(e.getEndpointUri().contains("end"));
                     assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 6715843..c3d26a4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
@@ -24,6 +26,7 @@ import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -66,8 +69,13 @@ public class DataFormatTest {
         props.put("camel.sink.marshal", "missingDataformat");
 
         CamelSinkTask camelsinkTask = new CamelSinkTask();
-        assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
+        camelsinkTask.start(props);
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
+        records.add(record);
+        assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
         // No need to check the stop method. The error is already thrown/caught during startup.
+        camelsinkTask.stop();
     }
 
     @Test

[camel-kafka-connector] 05/06: Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 8bbd57027cc708e73a9d5b7f2007f66b9ee1f201
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Jul 26 11:34:35 2021 +0200

    Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog
---
 .../utils/CamelKafkaConnectMain.java               | 172 ++++++++++-----------
 .../camel/kafkaconnector/DataFormatTest.java       |   9 +-
 2 files changed, 80 insertions(+), 101 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 0871307..2e8d3a8 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
-import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
@@ -31,11 +30,8 @@ import org.apache.camel.builder.ErrorHandlerBuilderRef;
 import org.apache.camel.builder.NoErrorHandlerBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
-import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.CamelSourceTask;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.model.RouteTemplateDefinition;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -46,6 +42,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelKafkaConnectMain extends SimpleMain {
+    public static final String KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcMarshal.";
+    public static final String KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcUnMarshal.";
+    public static final String KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcAggregator.";
+    public static final String KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcIdempotent.";
+    public static final String KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcRemoveHeader.";
+
     private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
 
     protected volatile ConsumerTemplate consumerTemplate;
@@ -228,36 +230,45 @@ public class CamelKafkaConnectMain extends SimpleMain {
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
+            //error handler
+            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
+            if (errorHandler != null) {
+                switch (errorHandler) {
+                    case "no":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
+                        break;
+                    case "default":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+                        break;
+                    default:
+                        break;
+                }
+            }
+
             //dataformats
             if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
+                camelProperties.put(KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
             }
             if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
+                camelProperties.put(KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
             }
 
             //aggregator
             if (!ObjectHelper.isEmpty(aggregationSize)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+                camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
             }
             if (!ObjectHelper.isEmpty(aggregationTimeout)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+                camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
             }
 
             //idempotency
             if (idempotencyEnabled) {
                 switch (expressionType) {
                     case "body":
-                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
-                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
                         break;
                     case "header":
-                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
-                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
                         break;
                     default:
                         break;
@@ -279,117 +290,92 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
             //remove headers
             if (!ObjectHelper.isEmpty(headersExcludePattern)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+                camelProperties.put(KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
             }
 
+            // log filtered properties and set initial camel properties
             List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
             LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
             camelMain.setInitialProperties(camelProperties);
 
-            //error handler
-            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
-            if (errorHandler != null) {
-                switch (errorHandler) {
-                    case "no":
-                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
-                        break;
-                    case "default":
-                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
-                        break;
-                    default:
-                        break;
-                }
-            }
-
             camelMain.configure().addRoutesBuilder(new RouteBuilder() {
                 public void configure() {
 
-                    //creating source template
-                    RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
-                            .templateParameter("fromUrl")
-                            .templateParameter("errorHandler", "ckcErrorHandler")
-
+                    //create marshal template
+                    routeTemplate("ckcMarshal")
                             .templateParameter("marshal", "dummyDataformat")
+                            .from("kamelet:source")
+                            .marshal("{{marshal}}")
+                            .to("kamelet:sink");
+
+                    //create unmarshal template
+                    routeTemplate("ckcUnMarshal")
                             .templateParameter("unmarshal", "dummyDataformat")
+                            .from("kamelet:source")
+                            .marshal("{{unmarshal}}")
+                            .to("kamelet:sink");
 
-                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                    //create aggregator template
+                    routeTemplate("ckcAggregator")
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
                             .templateParameter("aggregationSize", "1")
                             .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+                            .from("kamelet:source")
+                            .aggregate(constant(true))
+                                .aggregationStrategyRef("{{aggregationStrategy}}")
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}")
+                                .to("kamelet:sink")
+                            .end();
 
+                    //create idempotent template
+                    routeTemplate("ckcIdempotent")
                             .templateParameter("idempotentExpression", "dummyExpression")
                             .templateParameter("idempotentRepository", "ckcIdempotentRepository")
-                            .templateParameter("headersExcludePattern", "(?!)");
-
-
-                    ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
-                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
-                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}");
-                    }
-                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}");
-                    }
-
-                    if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        rdInTemplateSource = rdInTemplateSource.aggregate(s)
-                                .constant(true)
-                                .completionSize("{{aggregationSize}}")
-                                .completionTimeout("{{aggregationTimeout}}");
-                    }
+                            .from("kamelet:source")
+                            .idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}")
+                            .to("kamelet:sink");
 
-                    if (idempotencyEnabled) {
-                        rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
-                    }
+                    //create removeHeader template
+                    routeTemplate("ckcRemoveHeader")
+                            .templateParameter("headersExcludePattern", "(?!)")
+                            .from("kamelet:source")
+                            .removeHeaders("{{headersExcludePattern}}")
+                            .to("kamelet:sink");
 
-                    rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
+                    //creating source template
+                    routeTemplate("ckcSource")
+                            .templateParameter("fromUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            .from("{{fromUrl}}")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
                             .to("kamelet:sink");
 
                     //creating sink template
-                    RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
+                    routeTemplate("ckcSink")
                             .templateParameter("toUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
-                            .templateParameter("marshal", "dummyDataformat")
-                            .templateParameter("unmarshal", "dummyDataformat")
-
-                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
-                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
-                            .templateParameter("aggregationSize", "1")
-                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
-
-                            .templateParameter("idempotentExpression", "dummyExpression")
-                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
-                            .templateParameter("headersExcludePattern", "(?!)");
-
+                            .from("kamelet:source")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
+                            .to("{{toUrl}}");
 
-                    ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
-                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
+                    //creating the actual route
+                    ProcessorDefinition<?> rd = from(from);
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}");
+                        rd = rd.kamelet("ckcMarshal");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}");
+                        rd = rd.kamelet("ckcUnMarshal");
                     }
-
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        rdInTemplateSink = rdInTemplateSink.aggregate(s)
-                                .constant(true)
-                                .completionSize("{{aggregationSize}}")
-                                .completionTimeout("{{aggregationTimeout}}");
+                        rd = rd.kamelet("ckcAggregator");
                     }
-
                     if (idempotencyEnabled) {
-                        rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                        rd = rd.kamelet("ckcIdempotent");
                     }
-
-                    rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
-                            .to("{{toUrl}}");
-
-                    //creating the actual route
-                    from(from).toD(to);
+                    rd = rd.kamelet("ckcRemoveHeader");
+                    rd.toD(to);
                 }
             });
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index c3d26a4..36a886c 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
@@ -26,7 +24,6 @@ import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -69,11 +66,7 @@ public class DataFormatTest {
         props.put("camel.sink.marshal", "missingDataformat");
 
         CamelSinkTask camelsinkTask = new CamelSinkTask();
-        camelsinkTask.start(props);
-        List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
-        records.add(record);
-        assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
+        assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
         // No need to check the stop method. The error is already thrown/caught during startup.
         camelsinkTask.stop();
     }

[camel-kafka-connector] 03/06: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 68682b197701887e96071f85363fbac52dbac93f
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Wed May 19 10:46:13 2021 +0200

    Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
---
 .../utils/CamelKafkaConnectMain.java               | 25 +++++++++++-----------
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 036375b..0871307 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -230,12 +230,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
             //dataformats
             if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
             }
             if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
             }
 
             //aggregator
@@ -310,8 +310,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
                             .templateParameter("fromUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
 
-                            .templateParameter("marshall", "dummyDataformat")
-                            .templateParameter("unmarshall", "dummyDataformat")
+                            .templateParameter("marshal", "dummyDataformat")
+                            .templateParameter("unmarshal", "dummyDataformat")
 
                             //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -326,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
                             .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}");
+                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}");
+                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
@@ -351,9 +351,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
                             .templateParameter("toUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
-                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//                            .templateParameter("marshall", "dummyDataformat")
-//                            .templateParameter("unmarshall", "dummyDataformat")
+                            .templateParameter("marshal", "dummyDataformat")
+                            .templateParameter("unmarshal", "dummyDataformat")
 
                             //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -368,10 +367,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
                             .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {

[camel-kafka-connector] 04/06: Polished tests timeouts.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 7044f326c37f623f4ec07476f4a5dcf954450bab
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Jul 19 16:12:09 2021 +0200

    Polished tests timeouts.
---
 .../kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
index 12f0e19..fff41ef 100644
--- a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
+++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
@@ -92,7 +92,7 @@ public class CamelSinkGooglePubSubITCase extends CamelSinkTestSupport {
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         List<String> receivedMessages = easyClient.getReceivedMessages();
 
-        if (latch.await(40, TimeUnit.SECONDS)) {
+        if (latch.await(60, TimeUnit.SECONDS)) {
             assertEquals(expected, receivedMessages.size(), "Did not receive as many messages as was sent");
         } else {
             fail("Failed to receive the messages within the specified time");

[camel-kafka-connector] 06/06: Related to #423 added autogeneration of kamelets

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 5873cbcb193bb7474c59d5f6e11010497c1fde0c
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sun Aug 22 00:37:05 2021 +0200

    Related to #423 added autogeneration of kamelets
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  10 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   9 +-
 parent/pom.xml                                     |   7 +
 .../pom.xml                                        |  22 ++
 .../AbstractCamelComponentKafkaConnectorMojo.java  |  34 +++
 .../maven/AbstractCamelKafkaConnectorMojo.java     |  16 +-
 .../AbstractCamelKameletKafkaConnectorMojo.java    |  34 +++
 .../maven/CamelKafkaConnectorCreateMojo.java       |   4 +-
 .../maven/CamelKafkaConnectorDeleteMojo.java       |   4 +-
 ...a => CamelKafkaConnectorKameletCreateMojo.java} |  20 +-
 ...a => CamelKafkaConnectorKameletUpdateMojo.java} | 233 +++++++++------------
 .../maven/CamelKafkaConnectorUpdateMojo.java       |  12 +-
 .../maven/GenerateCamelKafkaConnectorsMojo.java    | 183 +++++++++++++---
 .../kafkaconnector/maven/model/KameletModel.java   |  92 ++++++++
 .../maven/model/KameletPropertyModel.java          |  97 +++++++++
 .../maven/utils/YamlKameletMapper.java             |  90 ++++++++
 ...mel-kafka-connector-fix-dependencies.properties |  10 +-
 ...-connector-kamelet-fix-dependencies.properties} |   0
 ...l-kafka-connector-kamelet-template-pom.template | 105 ++++++++++
 .../maven/GenerateCamelKafkaConnectorsMojoIT.java  |  30 +--
 .../maven/utils/YamlKameletMapperTests.java        |  81 +++++++
 .../test_generate/pom.xml                          |  11 +
 ...-connector-kamelet-fix-dependencies.properties} |   0
 .../resources/kamelets/aws-s3-source.kamelet.yaml  |  83 ++++++++
 .../resources/kamelets/nodependencies.kamelet.yaml |  80 +++++++
 .../resources/kamelets/noproperties.kamelet.yaml   |  33 +++
 .../resources/template-connecotr-kamelet-pom.xml   | 105 ++++++++++
 tooling/pom.xml                                    |   1 +
 28 files changed, 1185 insertions(+), 221 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 4e5a201..b1dbf17 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -38,6 +38,7 @@ import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.ErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ public class CamelSinkTask extends SinkTask {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class);
 
     private static final String LOCAL_URL = "direct:start";
+    private static final String DEFAULT_KAMELET_CKC_SINK = "kamelet:ckcSink";
     private ErrantRecordReporter reporter;
 
     private CamelKafkaConnectMain cms;
@@ -123,7 +125,7 @@ public class CamelSinkTask extends SinkTask {
             }
             actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink")
+            cms = CamelKafkaConnectMain.builder(LOCAL_URL, getSinkKamelet())
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
@@ -144,7 +146,6 @@ public class CamelSinkTask extends SinkTask {
                 .withHeadersExcludePattern(headersRemovePattern)
                 .build(camelContext);
 
-
             cms.start();
 
             producer = cms.getProducerTemplate();
@@ -156,6 +157,11 @@ public class CamelSinkTask extends SinkTask {
         }
     }
 
+    @NotNull
+    protected String getSinkKamelet() {
+        return DEFAULT_KAMELET_CKC_SINK;
+    }
+
     protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(Map<String, String> props) {
         return new CamelSinkConnectorConfig(props);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 77ce636..5c63323 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -43,6 +43,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +60,7 @@ public class CamelSourceTask extends SourceTask {
     private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path.";
 
     private static final String LOCAL_URL = "seda:end";
+    private static final String DEFAULT_KAMELET_CKC_SOURCE = "kamelet:ckcSource";
 
     private CamelKafkaConnectMain cms;
     private PollingConsumer consumer;
@@ -148,7 +150,7 @@ public class CamelSourceTask extends SourceTask {
             }
             actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl)
+            cms = CamelKafkaConnectMain.builder(getSourceKamelet(), localUrl)
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
@@ -180,6 +182,11 @@ public class CamelSourceTask extends SourceTask {
         }
     }
 
+    @NotNull
+    protected String getSourceKamelet() {
+        return DEFAULT_KAMELET_CKC_SOURCE;
+    }
+
     private long remaining(long startPollEpochMilli, long maxPollDuration)  {
         return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
     }
diff --git a/parent/pom.xml b/parent/pom.xml
index 0d13f32..d7c1b32 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -29,6 +29,7 @@
 
         <kafka.version>2.8.0</kafka.version>
         <camel.version>3.11.1</camel.version>
+        <camel.kamelet.catalog.version>0.3.0</camel.kamelet.catalog.version>
         <apicurio.registry.version>1.3.2.Final</apicurio.registry.version>
         <resteasy.version>4.5.6.Final</resteasy.version>
         <version.java>1.8</version.java>
@@ -117,6 +118,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.camel.kamelets</groupId>
+                <artifactId>camel-kamelets-catalog</artifactId>
+                <version>${camel.kamelet.catalog.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.jctools</groupId>
                 <artifactId>jctools-core</artifactId>
                 <version>${version.jctools}</version>
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/pom.xml b/tooling/camel-kafka-connector-generator-maven-plugin/pom.xml
index 89e5bfd..c0b91b5 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/pom.xml
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/pom.xml
@@ -51,6 +51,11 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.github.classgraph</groupId>
+            <artifactId>classgraph</artifactId>
+            <version>${classgraph.version}</version>
+        </dependency>
 
         <!-- Maven plugin deps -->
         <dependency>
@@ -106,6 +111,18 @@
             <version>2.3.1</version>
         </dependency>
 
+        <!--  jackson  -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+<!--            <version>${jackson.version}</version>-->
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+<!--            <version>${jackson.version}</version>-->
+        </dependency>
+
         <!-- camel -->
         <dependency>
             <groupId>org.apache.camel</groupId>
@@ -121,6 +138,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.camel.kamelets</groupId>
+            <artifactId>camel-kamelets-catalog</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-package-maven-plugin</artifactId>
             <version>${camel.version}</version>
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelComponentKafkaConnectorMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelComponentKafkaConnectorMojo.java
new file mode 100644
index 0000000..ea9e4b6
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelComponentKafkaConnectorMojo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.maven;
+
+import org.apache.maven.plugins.annotations.Parameter;
+
+public abstract class AbstractCamelComponentKafkaConnectorMojo extends AbstractCamelKafkaConnectorMojo {
+
+    /**
+     * The initial pom template file.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-template-pom.template")
+    protected String initialPomTemplate;
+
+    /**
+     * Properties file to configure additional dependencies.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-fix-dependencies.properties")
+    protected String fixDependenciesProperties;
+}
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
index ac810a2..f18f17b 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
@@ -54,12 +54,6 @@ public abstract class AbstractCamelKafkaConnectorMojo extends AbstractMojo {
     protected MavenProject project;
 
     /**
-     * The initial pom template file.
-     */
-    @Parameter(defaultValue = "camel-kafka-connector-template-pom.template")
-    protected String initialPomTemplate;
-
-    /**
      * NOTICE file.
      */
     @Parameter(defaultValue = "camel-kafka-connector-NOTICE.txt")
@@ -72,12 +66,6 @@ public abstract class AbstractCamelKafkaConnectorMojo extends AbstractMojo {
     protected String licenseTemplate;
 
     /**
-     * Properties file to configure additional dependencies.
-     */
-    @Parameter(defaultValue = "camel-kafka-connector-fix-dependencies.properties")
-    protected String fixDependenciesProperties;
-
-    /**
      * Package file template to be placed in src/main/assembly/package.xml.
      */
     @Parameter(defaultValue = "camel-kafka-connector-template-package.template")
@@ -130,13 +118,13 @@ public abstract class AbstractCamelKafkaConnectorMojo extends AbstractMojo {
     /**
      * Execute goal.
      *
-     * @throws MojoExecutionException execution of the main class or one of the
-     *                                                        threads it generated failed.
+     * @throws MojoExecutionException execution of the main class or one of the threads it generated failed.
      * @throws MojoFailureException   something bad happened...
      */
     @Override
     public void execute() throws MojoExecutionException, MojoFailureException {
         configureResourceManager();
+        //execute only once for the connectors parent project which can be configured with <connectorsProjectName> option
         if (!project.getArtifactId().equals(connectorsProjectName)) {
             getLog().debug("Skipping project " + project.getArtifactId() + " since it is not " + connectorsProjectName + ", which can be configured with <connectorsProjectName> option.");
             return;
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKameletKafkaConnectorMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKameletKafkaConnectorMojo.java
new file mode 100644
index 0000000..8459a3a
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKameletKafkaConnectorMojo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.maven;
+
+import org.apache.maven.plugins.annotations.Parameter;
+
+public abstract class AbstractCamelKameletKafkaConnectorMojo extends AbstractCamelKafkaConnectorMojo {
+
+    /**
+     * The initial kamelet pom template file.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-kamelet-template-pom.template")
+    protected String initialKameletPomTemplate;
+
+    /**
+     * Properties kamelet file to configure additional dependencies.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-kamelet-fix-dependencies.properties")
+    protected String fixKameletDependenciesProperties;
+}
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorCreateMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorCreateMojo.java
index ad92b69..c8f5b92 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorCreateMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorCreateMojo.java
@@ -42,7 +42,7 @@ import static org.apache.camel.kafkaconnector.maven.utils.MavenUtils.writeXmlFor
 
 @Mojo(name = "camel-kafka-connector-create", threadSafe = true,
         defaultPhase = LifecyclePhase.GENERATE_RESOURCES)
-public class CamelKafkaConnectorCreateMojo extends AbstractCamelKafkaConnectorMojo {
+public class CamelKafkaConnectorCreateMojo extends AbstractCamelComponentKafkaConnectorMojo {
 
     @Parameter(property = "name", required = true)
     protected String name;
@@ -115,7 +115,7 @@ public class CamelKafkaConnectorCreateMojo extends AbstractCamelKafkaConnectorMo
         props.put("componentDescription", name);
         try {
             Document pom = MavenUtils.createCrateXmlDocumentFromTemplate(pomTemplate, props);
-            // Write the starter pom
+            // Write the connector pom
             File pomFile = new File(directory, "pom.xml");
             writeXmlFormatted(pom, pomFile, getLog());
         } catch (Exception e) {
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorDeleteMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorDeleteMojo.java
index d377396..ca25646 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorDeleteMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorDeleteMojo.java
@@ -34,7 +34,7 @@ import static org.apache.camel.kafkaconnector.maven.utils.MavenUtils.sanitizeMav
 
 @Mojo(name = "camel-kafka-connector-delete", threadSafe = true,
         defaultPhase = LifecyclePhase.GENERATE_RESOURCES)
-public class CamelKafkaConnectorDeleteMojo extends AbstractCamelKafkaConnectorMojo {
+public class CamelKafkaConnectorDeleteMojo extends AbstractCamelComponentKafkaConnectorMojo {
 
     @Parameter(property = "name", required = true)
     protected String name;
@@ -58,7 +58,7 @@ public class CamelKafkaConnectorDeleteMojo extends AbstractCamelKafkaConnectorMo
         try {
             deleteConnector();
         } catch (Exception e) {
-            throw new MojoFailureException("Fail to create connector " + name, e);
+            throw new MojoFailureException("Fail to delete connector " + name, e);
         }
     }
 
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorCreateMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorKameletCreateMojo.java
similarity index 89%
copy from tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorCreateMojo.java
copy to tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorKameletCreateMojo.java
index ad92b69..8cbabc3 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorCreateMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorKameletCreateMojo.java
@@ -30,8 +30,6 @@ import org.w3c.dom.Document;
 
 import freemarker.template.Template;
 import org.apache.camel.kafkaconnector.maven.utils.MavenUtils;
-import org.apache.camel.tooling.model.ComponentModel;
-import org.apache.camel.tooling.model.JsonMapper;
 import org.apache.maven.plugin.MojoFailureException;
 import org.apache.maven.plugins.annotations.LifecyclePhase;
 import org.apache.maven.plugins.annotations.Mojo;
@@ -40,22 +38,19 @@ import org.apache.maven.plugins.annotations.Parameter;
 import static org.apache.camel.kafkaconnector.maven.utils.MavenUtils.sanitizeMavenArtifactId;
 import static org.apache.camel.kafkaconnector.maven.utils.MavenUtils.writeXmlFormatted;
 
-@Mojo(name = "camel-kafka-connector-create", threadSafe = true,
+@Mojo(name = "camel-kafka-connector-kamelet-create", threadSafe = true,
         defaultPhase = LifecyclePhase.GENERATE_RESOURCES)
-public class CamelKafkaConnectorCreateMojo extends AbstractCamelKafkaConnectorMojo {
+public class CamelKafkaConnectorKameletCreateMojo extends AbstractCamelKameletKafkaConnectorMojo {
 
     @Parameter(property = "name", required = true)
     protected String name;
 
-    @Parameter(property = "componentJson", required = true)
-    protected String componentJson;
-
     @Parameter(property = "overridePomFile", required = false, defaultValue = "false")
     protected Boolean overridePomFile;
 
     @Override
     protected String getMainDepArtifactId() {
-        return "camel-" + name;
+        return name;
     }
 
     @Override
@@ -77,7 +72,7 @@ public class CamelKafkaConnectorCreateMojo extends AbstractCamelKafkaConnectorMo
     }
 
     private void createConnector() throws Exception {
-        getLog().info("Creating camel kafka connector for " + name);
+        getLog().info("Creating camel kafka kamelet connector for " + name);
         String sanitizedName = sanitizeMavenArtifactId(name);
         //check if the connector is already created
         File directory = new File(projectDir, "camel-" + sanitizedName + KAFKA_CONNECTORS_SUFFIX);
@@ -103,19 +98,16 @@ public class CamelKafkaConnectorCreateMojo extends AbstractCamelKafkaConnectorMo
 
     private void generateAndWritePom(String sanitizedName, File directory) throws Exception {
         //create initial connector pom
-        ComponentModel cm = JsonMapper.generateComponentModel(componentJson);
         getLog().info("Creating a new pom.xml for the connector from scratch");
-        Template pomTemplate = MavenUtils.getTemplate(rm.getResourceAsFile(initialPomTemplate));
+        Template pomTemplate = MavenUtils.getTemplate(rm.getResourceAsFile(initialKameletPomTemplate));
         Map<String, String> props = new HashMap<>();
         props.put("version", project.getVersion());
-        props.put("dependencyId", cm.getArtifactId());
-        props.put("dependencyGroup", cm.getGroupId());
         props.put("componentName", name);
         props.put("componentSanitizedName", sanitizedName);
         props.put("componentDescription", name);
         try {
             Document pom = MavenUtils.createCrateXmlDocumentFromTemplate(pomTemplate, props);
-            // Write the starter pom
+            // Write the connector pom
             File pomFile = new File(directory, "pom.xml");
             writeXmlFormatted(pom, pomFile, getLog());
         } catch (Exception e) {
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorUpdateMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorKameletUpdateMojo.java
similarity index 82%
copy from tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorUpdateMojo.java
copy to tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorKameletUpdateMojo.java
index dba04b2..5d72364 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorUpdateMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorKameletUpdateMojo.java
@@ -33,7 +33,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import javax.annotation.Generated;
@@ -43,6 +42,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
 import javax.xml.xpath.XPathFactory;
 
 import org.w3c.dom.Document;
@@ -51,14 +51,14 @@ import org.w3c.dom.Node;
 import org.xml.sax.SAXException;
 
 import freemarker.template.Template;
+import org.apache.camel.kafkaconnector.maven.model.KameletModel;
+import org.apache.camel.kafkaconnector.maven.model.KameletPropertyModel;
 import org.apache.camel.kafkaconnector.maven.utils.JsonMapperKafkaConnector;
 import org.apache.camel.kafkaconnector.maven.utils.MavenUtils;
+import org.apache.camel.kafkaconnector.maven.utils.YamlKameletMapper;
 import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorModel;
 import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorOptionModel;
 import org.apache.camel.maven.packaging.MvelHelper;
-import org.apache.camel.tooling.model.BaseOptionModel;
-import org.apache.camel.tooling.model.ComponentModel;
-import org.apache.camel.tooling.model.JsonMapper;
 import org.apache.camel.tooling.util.Strings;
 import org.apache.camel.tooling.util.srcgen.JavaClass;
 import org.apache.camel.tooling.util.srcgen.Method;
@@ -88,13 +88,13 @@ import static org.apache.camel.tooling.util.PackageHelper.loadText;
 import static org.apache.camel.tooling.util.PackageHelper.writeText;
 
 /**
- * Generate Camel Kafka Connector for the component
+ * Generate Camel Kafka Connector for the kamelet
  */
-@Mojo(name = "camel-kafka-connector-update", threadSafe = true, 
+@Mojo(name = "camel-kafka-connector-kamelet-update", threadSafe = true,
 requiresDependencyCollection = ResolutionScope.COMPILE_PLUS_RUNTIME, 
 requiresDependencyResolution = ResolutionScope.COMPILE_PLUS_RUNTIME, 
 defaultPhase = LifecyclePhase.GENERATE_RESOURCES)
-public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMojo {
+public class CamelKafkaConnectorKameletUpdateMojo extends AbstractCamelKameletKafkaConnectorMojo {
 
     private static final String GENERATED_SECTION_START = "START OF GENERATED CODE";
     private static final String GENERATED_SECTION_START_COMMENT = "<!--" + GENERATED_SECTION_START + "-->";
@@ -102,6 +102,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
     private static final String GENERATED_SECTION_END_COMMENT = "<!--" + GENERATED_SECTION_END + "-->";
 
     private static final String EXCLUDE_DEPENDENCY_PROPERTY_PREFIX = "exclude_";
+
     private static final String ADDITIONAL_COMMON_PROPERTIES_PROPERTY_PREFIX = "additional_properties_";
     private static final String XML_FEATURES_DISALLOW_DOCTYPE_DECL = "http://apache.org/xml/features/disallow-doctype-decl";
 
@@ -122,6 +123,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         PRIMITIVE_TYPES_TO_CLASS_MAP.put("boolean", Boolean.class);
         PRIMITIVE_TYPES_TO_CLASS_MAP.put("long", Long.class);
         PRIMITIVE_TYPES_TO_CLASS_MAP.put("int", Integer.class);
+        PRIMITIVE_TYPES_TO_CLASS_MAP.put("integer", Integer.class);
         PRIMITIVE_TYPES_TO_CLASS_MAP.put("short", Short.class);
         PRIMITIVE_TYPES_TO_CLASS_MAP.put("double", Double.class);
         PRIMITIVE_TYPES_TO_CLASS_MAP.put("float", Float.class);
@@ -130,6 +132,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.put("boolean", "ConfigDef.Type.BOOLEAN");
         PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.put("long", "ConfigDef.Type.LONG");
         PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.put("int", "ConfigDef.Type.INT");
+        PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.put("integer", "ConfigDef.Type.INT");
         PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.put("short", "ConfigDef.Type.SHORT");
         PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.put("double", "ConfigDef.Type.DOUBLE");
         PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.put("float", "ConfigDef.Type.DOUBLE");
@@ -143,8 +146,8 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
     @Parameter(property = "name", required = true)
     protected String name;
 
-    @Parameter(property = "componentJson", required = true)
-    protected String componentJson;
+    @Parameter(property = "kameletYaml", required = true)
+    protected String kameletYaml;
 
     /**
      * The maven session.
@@ -197,23 +200,29 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
     }
 
     private void updateConnector() throws Exception {
+        KameletModel kamelet = YamlKameletMapper.parseKameletYaml(kameletYaml);
         String sanitizedName = sanitizeMavenArtifactId(name);
-        // create the starter directory
-        File connectorDir = new File(projectDir, "camel-" + sanitizedName + KAFKA_CONNECTORS_SUFFIX);
+        // create the connector directory
+        File connectorDir = new File(projectDir,  "camel-" + sanitizedName + KAFKA_CONNECTORS_SUFFIX);
         if (!connectorDir.exists() || !connectorDir.isDirectory()) {
-            getLog().info("Connector " + name + " can not be updated since directory " + connectorDir.getAbsolutePath() + " dose not exist.");
-            throw new MojoFailureException("Directory already exists: " + connectorDir);
+            getLog().info("Connector " + name + " can not be updated since directory " + connectorDir.getAbsolutePath() + " dose not exist or is not a directory (maybe use camel-kafka-connector-kamelet-create first).");
+            throw new MojoFailureException("Directory dose not already exists or is not a directory: " + connectorDir);
         }
 
         // create the base pom.xml
         Document pom = createBasePom(connectorDir);
 
-        // Apply changes to the starter pom
-        fixExcludedDependencies(pom);
-        fixAdditionalDependencies(pom, additionalDependencies);
+        // Apply changes to the connector pomDocument pom
+        Set<String> dependencies = new HashSet<>();
+        dependencies.addAll(getKameletDependencies(kamelet));
+        dependencies.addAll(getAdditionalDependencies(additionalDependencies));
+        if (!dependencies.isEmpty()) {
+            getLog().debug("The following dependencies will be added to the connector: " + dependencies);
+            MavenUtils.addDependencies(pom, dependencies, GENERATED_SECTION_START, GENERATED_SECTION_END);
+        }
         fixAdditionalRepositories(pom);
 
-        // Write the starter pom
+        // Write the connector pom
         File pomFile = new File(connectorDir, "pom.xml");
         writeXmlFormatted(pom, pomFile, getLog());
 
@@ -225,57 +234,49 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         // write LICENSE, USAGE
         writeStaticFiles(connectorDir);
 
+        // write kamlete yaml file
+        File docFolder = new File(connectorDir, "src/main/resources/kamelets");
+        File docFile = new File(docFolder, name + ".kamelet.yaml");
+        updateFile(docFile, kameletYaml);
+
         // generate classes
-        ComponentModel model = JsonMapper.generateComponentModel(componentJson);
-        if (model.isConsumerOnly()) {
-            createClasses(sanitizedName, connectorDir, model, ConnectorType.SOURCE);
-        } else if (model.isProducerOnly()) {
-            createClasses(sanitizedName, connectorDir, model, ConnectorType.SINK);
-        } else {
-            createClasses(sanitizedName, connectorDir, model, ConnectorType.SOURCE);
-            createClasses(sanitizedName, connectorDir, model, ConnectorType.SINK);
+        String kameletType = kamelet.getType().toLowerCase();
+        switch (kameletType) {
+            case "source":
+                createClassesAndDocumentation(sanitizedName, connectorDir, kamelet, ConnectorType.SOURCE);
+                break;
+            case "sink":
+                createClassesAndDocumentation(sanitizedName, connectorDir, kamelet, ConnectorType.SINK);
+                break;
+            default:
+                getLog().warn("Unsupported kamelet type: " + kameletType);
         }
     }
 
-    private void fixExcludedDependencies(Document pom) throws Exception {
-        // add dependencies to be excluded form camel component dependency
-        Set<String> loggingImpl = new HashSet<>();
-
-        // excluded dependencies
-        Set<String> configExclusions = new HashSet<>();
-        Properties properties = new Properties();
-
-        try (InputStream stream = new FileInputStream(rm.getResourceAsFile(fixDependenciesProperties))) {
-            properties.load(stream);
-        }
-
-        String artExcl = properties.getProperty(EXCLUDE_DEPENDENCY_PROPERTY_PREFIX + getMainDepArtifactId());
-        getLog().debug("Configured exclusions: " + artExcl);
-        if (artExcl != null && artExcl.trim().length() > 0) {
-            for (String dep : artExcl.split(",")) {
-                getLog().debug("Adding configured exclusion: " + dep);
-                configExclusions.add(dep);
+    private Set<String> getKameletDependencies(KameletModel kamelet) throws XPathExpressionException {
+        Set<String> deps = new HashSet<>(kamelet.getDependencies());
+        Set<String> gavDeps = deps.stream().map(stringDep -> {
+            if (stringDep.startsWith("mvn:")) {
+                return stringDep.replaceFirst("mvn:", "");
+            } else if (stringDep.startsWith("camel:")) {
+                return getMainDepGroupId() + ":" + stringDep.replaceFirst(":", "-");
+            } else {
+                getLog().warn("Dependency " + stringDep + "is used as is. Might not be the intended behaviour!");
+                return stringDep;
             }
-        }
+        }).collect(Collectors.toSet());
 
-        Set<String> libsToRemove = new TreeSet<>();
-        libsToRemove.addAll(loggingImpl);
-        libsToRemove.addAll(configExclusions);
-
-        if (!libsToRemove.isEmpty()) {
-            getLog().info("Camel-kafka-connector: the following dependencies will be removed from the connector: " + libsToRemove);
-            MavenUtils.addExclusionsToDependency(pom, getMainDepArtifactId(), libsToRemove, GENERATED_SECTION_START, GENERATED_SECTION_END);
-        }
+        return gavDeps;
     }
 
-    private void fixAdditionalDependencies(Document pom, String additionalDependencies) throws Exception {
+    private Set<String> getAdditionalDependencies(String additionalDependencies) throws Exception {
         Properties properties = new Properties();
 
-        try (InputStream stream = new FileInputStream(rm.getResourceAsFile(fixDependenciesProperties))) {
+        try (InputStream stream = new FileInputStream(rm.getResourceAsFile(fixKameletDependenciesProperties))) {
             properties.load(stream);
         }
 
-        Set<String> deps = new TreeSet<>();
+        Set<String> deps = new HashSet<>();
         deps.addAll(MavenUtils.csvToSet(properties.getProperty(getMainDepArtifactId())));
         deps.addAll(MavenUtils.csvToSet(additionalDependencies));
 
@@ -295,10 +296,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
             deps.addAll(globalProps);
         }
 
-        if (!deps.isEmpty()) {
-            getLog().debug("The following dependencies will be added to the starter: " + deps);
-            MavenUtils.addDependencies(pom, deps, GENERATED_SECTION_START, GENERATED_SECTION_END);
-        }
+        return deps;
     }
 
     private void fixAdditionalRepositories(Document pom) throws Exception {
@@ -348,7 +346,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
                         pom = builder.parse(contentIn);
                     }
 
-                    getLog().debug("Reusing the existing pom.xml for the starter");
+                    getLog().debug("Reusing the existing pom.xml for the connector");
                     return pom;
                 } else {
                     getLog().error("Cannot use the existing pom.xml file since it is not editable. It does not contain " + GENERATED_SECTION_START_COMMENT);
@@ -357,8 +355,8 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
                 }
             }
         } else {
-            getLog().error("The pom.xml file is not present, please use camel-kafka-connector-create first.");
-            throw new UnsupportedOperationException("The pom.xml file is not present, please use camel-kafka-connector-create first.");
+            getLog().error("The pom.xml file is not present, please use camel-kafka-connector-kamelet-create first.");
+            throw new UnsupportedOperationException("The pom.xml file is not present, please use camel-kafka-connector-kamelet-create first.");
         }
     }
 
@@ -374,7 +372,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         writeFileIfChanged(license, new File(connectorDir, "src/main/resources/META-INF/LICENSE.txt"), getLog());
     }
 
-    private void createClasses(String sanitizedName, File connectorDir, ComponentModel model, ConnectorType ct)
+    private void createClassesAndDocumentation(String sanitizedName, File connectorDir, KameletModel kamelet, ConnectorType ct)
         throws MojoFailureException, ResourceNotFoundException, FileResourceCreationException, IOException, MojoExecutionException {
         String ctCapitalizedName = StringUtils.capitalize(ct.name().toLowerCase());
         String ctLowercaseName = ct.name().toLowerCase();
@@ -382,7 +380,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         Map<String, String> additionalProperties = new HashMap<>();
         Properties properties = new Properties();
 
-        try (InputStream stream = new FileInputStream(rm.getResourceAsFile(fixDependenciesProperties))) {
+        try (InputStream stream = new FileInputStream(rm.getResourceAsFile(fixKameletDependenciesProperties))) {
             properties.load(stream);
         }
 
@@ -413,37 +411,12 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         Method confMethod = javaClassConnectorConfig.addMethod().setConstructor(false).setName("conf").setReturnType("ConfigDef").setPublic().setStatic()
             .setBody("ConfigDef conf = new ConfigDef(Camel" + ctCapitalizedName + "ConnectorConfig.conf());\n");
 
-        Predicate<? super BaseOptionModel> filterEndpointOptions;
-        switch (ct) {
-            case SINK:
-                filterEndpointOptions = new Predicate<BaseOptionModel>() {
-                    @Override
-                    public boolean test(BaseOptionModel optionModel) {
-                        return optionModel.getLabel() == null || optionModel.getLabel().contains("producer")
-                               || (!optionModel.getLabel().contains("producer") && !optionModel.getLabel().contains("consumer"));
-                    }
-                };
-                break;
-            case SOURCE:
-                filterEndpointOptions = new Predicate<BaseOptionModel>() {
-                    @Override
-                    public boolean test(BaseOptionModel optionModel) {
-                        return optionModel.getLabel() == null || optionModel.getLabel().contains("consumer")
-                               || (!optionModel.getLabel().contains("producer") && !optionModel.getLabel().contains("consumer"));
-                    }
-                };
-                break;
-            default:
-                throw new UnsupportedOperationException("Connector type not supported: " + ct + " must be one of " + ConnectorType.SINK + ", " + ConnectorType.SOURCE);
-        }
-
+        // instantiate CamelKafkaConnectorOptionModel for further use during documentation generation
         List<CamelKafkaConnectorOptionModel> listOptions = new ArrayList<>();
-        model.getEndpointPathOptions().stream().filter(filterEndpointOptions)
-            .forEachOrdered(epo -> addConnectorOptions(sanitizedName, ct, javaClassConnectorConfig, confMethod, "PATH", ctLowercaseName, "path", epo, listOptions));
-        model.getEndpointParameterOptions().stream().filter(filterEndpointOptions)
-            .forEachOrdered(epo -> addConnectorOptions(sanitizedName, ct, javaClassConnectorConfig, confMethod, "ENDPOINT", ctLowercaseName, "endpoint", epo, listOptions));
-        model.getComponentOptions().stream().filter(filterEndpointOptions)
-            .forEachOrdered(co -> addConnectorOptions(sanitizedName, ct, javaClassConnectorConfig, confMethod, "COMPONENT", "component", sanitizedName, co, listOptions));
+        List<KameletPropertyModel> kameletProperties = kamelet.getProperties();
+        kameletProperties.forEach(
+            kameletProperty -> addConnectorOptions(sanitizedName, ct, javaClassConnectorConfig, confMethod, "KAMELET", kameletProperty, listOptions, kamelet.getRequiredProperties())
+        );
 
         confMethod.setBody(confMethod.getBody() + "return conf;");
 
@@ -466,15 +439,21 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         javaClassTask.addMethod().setConstructor(false).setName("getCamel" + ctCapitalizedName + "ConnectorConfig").setProtected().addParameter("Map<String, String>", "props")
             .setReturnType("Camel" + ctCapitalizedName + "ConnectorConfig")
             .setBody("return new Camel" + StringUtils.capitalize(sanitizedName.replace("-", "")) + ctCapitalizedName + "ConnectorConfig(props);").addAnnotation(Override.class);
-        Method getDefaultConfigMethod = javaClassTask.addMethod().setConstructor(false).setName("getDefaultConfig").setProtected().setReturnType("Map<String, String>")
-            .setBody("return new HashMap<String, String>() {{\n");
-        getDefaultConfigMethod
-            .setBody(getDefaultConfigMethod.getBody() + "    put(Camel" + ctCapitalizedName + "ConnectorConfig.CAMEL_" + ct + "_COMPONENT_CONF, \"" + model.getScheme() + "\");\n");
-        for (String key : new TreeSet<String>(additionalProperties.keySet())) {
-            getDefaultConfigMethod.setBody(getDefaultConfigMethod.getBody() + "    put(\"" + key + "\", \"" + additionalProperties.get(key) + "\");\n");
+
+        if (!additionalProperties.keySet().isEmpty()) {
+            Method getDefaultConfigMethod = javaClassTask.addMethod().setConstructor(false).setName("getDefaultConfig").setProtected().setReturnType("Map<String, String>")
+                    .setBody("return new HashMap<String, String>() {{\n");
+            for (String key : new TreeSet<String>(additionalProperties.keySet())) {
+                getDefaultConfigMethod.setBody(getDefaultConfigMethod.getBody() + "    put(\"" + key + "\", \"" + additionalProperties.get(key) + "\");\n");
+            }
+            getDefaultConfigMethod.setBody(getDefaultConfigMethod.getBody() + "}};\n");
+            getDefaultConfigMethod.addAnnotation(Override.class);
         }
-        getDefaultConfigMethod.setBody(getDefaultConfigMethod.getBody() + "}};\n");
-        getDefaultConfigMethod.addAnnotation(Override.class);
+
+        Method getSinkOrSourceKameletMethod = javaClassTask.addMethod().setConstructor(false).setName("get" + ctCapitalizedName + "Kamelet").setProtected().setReturnType("String")
+                .setBody("return \"kamelet:" + name + "\"");
+        getSinkOrSourceKameletMethod.addAnnotation(Override.class);
+
         String javaClassTaskFileName = packageName.replaceAll("\\.", "\\/") + File.separator + javaClassTaskName + ".java";
         MavenUtils.writeSourceIfChanged(javaClassTask, javaClassTaskFileName, false, connectorDir, rm.getResourceAsFile(javaFilesHeader));
 
@@ -535,6 +514,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
                 }
             }
         }
+
         // docs/examples/Camel{sanitizedName}{Sink,Source}.properties
         try {
             String examplesPropertiestemplate = null;
@@ -568,10 +548,9 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         File docFile = new File(docFolder, getMainDepArtifactId() + "-kafka-" + ct.name().toLowerCase() + "-connector.adoc");
         File docFolderWebsite = new File(projectBaseDir, "docs/modules/ROOT/pages/connectors/");
         File docFileWebsite = new File(docFolderWebsite, getMainDepArtifactId() + "-kafka-" + ct.name().toLowerCase() + "-connector.adoc");
-        String changed = templateAutoConfigurationOptions(listOptions, model.getDescription(), connectorDir, ct, packageName + "." + javaClassConnectorName, convertersList,
+        String changed = templateAutoConfigurationOptions(listOptions, kamelet.getDescription(), connectorDir, ct, packageName + "." + javaClassConnectorName, convertersList,
                                                           transformsList, aggregationStrategiesList);
 
-
         boolean updated = updateAutoConfigureOptions(docFile, changed);
         if (updated) {
             getLog().info("Updated doc file: " + docFile);
@@ -586,7 +565,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         }
 
         // generate json descriptor src/generated/resources/<connector-name>.json
-        writeJson(listOptions, model.getDescription(), connectorDir, ct, packageName + "." + javaClassConnectorName, convertersList, transformsList, aggregationStrategiesList);
+        writeJson(listOptions, kamelet.getDescription(), connectorDir, ct, packageName + "." + javaClassConnectorName, convertersList, transformsList, aggregationStrategiesList);
         // generate descriptor src/generated/descriptors/connector-{sink,source}.properties
         writeDescriptors(connectorDir, ct);
     }
@@ -602,39 +581,37 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         }
     }
 
-    private void addConnectorOptions(String sanitizedName, ConnectorType ct, JavaClass javaClass, Method confMethod, String propertyQualifier, String firstNamespace,
-                                     String secondNamespace, BaseOptionModel baseOptionModel, List<CamelKafkaConnectorOptionModel> listOptions) {
-        String propertyName = baseOptionModel.getName();
+    private void addConnectorOptions(String sanitizedName, ConnectorType ct, JavaClass javaClass, Method confMethod, String propertyQualifier, KameletPropertyModel kameletProperty,
+                                     List<CamelKafkaConnectorOptionModel> listOptions, Set<String> kameletRequiredProperties) {
+        String propertyName = kameletProperty.getName();
 
         String regex = "([A-Z][a-z]+)";
         String replacement = "$1_";
 
         String propertyPrefix = "CAMEL_" + ct + "_" + sanitizedName.replace("-", "").toUpperCase() + "_" + propertyQualifier.toUpperCase() + "_"
                                 + StringUtils.capitalize(propertyName).replaceAll(regex, replacement).toUpperCase();
-        String propertyValue = "camel." + firstNamespace + "." + secondNamespace + "." + baseOptionModel.getName();
+        String propertyValue = "camel.kamelet." + propertyName;
 
         String confFieldName = propertyPrefix + "CONF";
         javaClass.addField().setFinal(true).setPublic().setStatic(true).setName(confFieldName).setType(String.class).setStringInitializer(propertyValue);
 
         String docFieldName = propertyPrefix + "DOC";
-        String docLiteralInitializer = baseOptionModel.getDescription();
-        if (baseOptionModel.getEnums() != null && !baseOptionModel.getEnums().isEmpty()) {
-            docLiteralInitializer = docLiteralInitializer + " One of:";
-            String enumOptionListing = baseOptionModel.getEnums().stream().reduce("", (s, s2) -> s + " [" + s2 + "]");
-            docLiteralInitializer = docLiteralInitializer + enumOptionListing;
+        String docLiteralInitializer = kameletProperty.getDescription();
+        if (kameletProperty.getExample() != null) {
+            docLiteralInitializer = docLiteralInitializer + " Example: " + kameletProperty.getExample();
         }
         javaClass.addField().setFinal(true).setPublic().setStatic(true).setName(docFieldName).setType(String.class).setStringInitializer(docLiteralInitializer);
 
         String defaultFieldName = propertyPrefix + "DEFAULT";
-        Class<?> defaultValueClass = PRIMITIVE_TYPES_TO_CLASS_MAP.getOrDefault(baseOptionModel.getShortJavaType(), String.class);
-        String type = baseOptionModel.getType();
+        Class<?> defaultValueClass = PRIMITIVE_TYPES_TO_CLASS_MAP.getOrDefault(kameletProperty.getType(), String.class);
+        String type = defaultValueClass.getSimpleName();
 
         String defaultValueClassLiteralInitializer;
-        if (baseOptionModel.getDefaultValue() == null) {
+        if (kameletProperty.getDefaultValue() == null) {
             //Handling null default camel options values (that means there is no default value).
             defaultValueClassLiteralInitializer = "null";
         } else {
-            defaultValueClassLiteralInitializer = baseOptionModel.getDefaultValue().toString();
+            defaultValueClassLiteralInitializer = kameletProperty.getDefaultValue();
             if (defaultValueClass.equals(String.class)) {
                 defaultValueClassLiteralInitializer = "\"" + defaultValueClassLiteralInitializer + "\"";
             }
@@ -666,22 +643,21 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
             .setLiteralInitializer(defaultValueClassLiteralInitializer);
 
         String confType;
-
-        if (baseOptionModel.isSecret()) {
-            confType = PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.getOrDefault(baseOptionModel.getShortJavaType(), CONFIG_DEF_TYPE_PASSWORD);
+        if ("password".equals(kameletProperty.getFormat())) {
+            confType = PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.getOrDefault(type, CONFIG_DEF_TYPE_PASSWORD);
         } else {
-            confType = PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.getOrDefault(baseOptionModel.getShortJavaType(), CONFIG_DEF_TYPE_STRING);
+            confType = PRIMITIVE_TYPES_TO_KAFKA_CONFIG_DEF_MAP.getOrDefault(type, CONFIG_DEF_TYPE_STRING);
         }
-        String confPriority = baseOptionModel.isDeprecated() ? CONFIG_DEF_IMPORTANCE_LOW : CONFIG_DEF_IMPORTANCE_MEDIUM;
-        confPriority = baseOptionModel.isRequired() ? CONFIG_DEF_IMPORTANCE_HIGH : confPriority;
+        boolean isRequired = kameletRequiredProperties.contains(kameletProperty.getName());
+        String confPriority = isRequired ? CONFIG_DEF_IMPORTANCE_HIGH : CONFIG_DEF_IMPORTANCE_MEDIUM;
         confMethod.setBody(confMethod.getBody() + "conf.define(" + confFieldName + ", " + confType + ", " + defaultFieldName + ", " + confPriority + ", " + docFieldName + ");\n");
         CamelKafkaConnectorOptionModel optionModel = new CamelKafkaConnectorOptionModel();
         optionModel.setName(propertyValue);
         optionModel.setDescription(docLiteralInitializer);
         optionModel.setPriority(StringUtils.removeStart(confPriority, CONFIG_DEF_IMPORTANCE_PREFIX));
         optionModel.setDefaultValue(defaultValueClassLiteralInitializer.equals("null") ? null : defaultValueClassLiteralInitializer);
-        optionModel.setRequired(String.valueOf(baseOptionModel.isRequired()));
-        optionModel.setPossibleEnumValues(baseOptionModel.getEnums());
+        optionModel.setRequired(String.valueOf(isRequired));
+        //XXX: kamelets dose not support enum like properties type yet.
         listOptions.add(optionModel);
     }
 
@@ -710,9 +686,9 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         try {
             String template = null;
             if (ct.name().equals(ConnectorType.SINK.name())) {
-                template = loadText(CamelKafkaConnectorUpdateMojo.class.getClassLoader().getResourceAsStream("camel-kafka-connector-sink-options.mvel"));
+                template = loadText(CamelKafkaConnectorKameletUpdateMojo.class.getClassLoader().getResourceAsStream("camel-kafka-connector-sink-options.mvel"));
             } else if (ct.name().equals(ConnectorType.SOURCE.name())) {
-                template = loadText(CamelKafkaConnectorUpdateMojo.class.getClassLoader().getResourceAsStream("camel-kafka-connector-source-options.mvel"));
+                template = loadText(CamelKafkaConnectorKameletUpdateMojo.class.getClassLoader().getResourceAsStream("camel-kafka-connector-source-options.mvel"));
             }
             String out = (String)TemplateRuntime.eval(template, model, Collections.singletonMap("util", MvelHelper.INSTANCE));
             return out;
@@ -750,7 +726,6 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
     }
     
     private void writeDescriptors(File connectorDir, ConnectorType ct) throws MojoExecutionException {
-
         String title;
         if (getMainDepArtifactId().equalsIgnoreCase("camel-coap+tcp")) {
             title = "camel-coap-tcp";
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorUpdateMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorUpdateMojo.java
index dba04b2..5fb0727 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorUpdateMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/CamelKafkaConnectorUpdateMojo.java
@@ -94,7 +94,7 @@ import static org.apache.camel.tooling.util.PackageHelper.writeText;
 requiresDependencyCollection = ResolutionScope.COMPILE_PLUS_RUNTIME, 
 requiresDependencyResolution = ResolutionScope.COMPILE_PLUS_RUNTIME, 
 defaultPhase = LifecyclePhase.GENERATE_RESOURCES)
-public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMojo {
+public class CamelKafkaConnectorUpdateMojo extends AbstractCamelComponentKafkaConnectorMojo {
 
     private static final String GENERATED_SECTION_START = "START OF GENERATED CODE";
     private static final String GENERATED_SECTION_START_COMMENT = "<!--" + GENERATED_SECTION_START + "-->";
@@ -198,7 +198,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
 
     private void updateConnector() throws Exception {
         String sanitizedName = sanitizeMavenArtifactId(name);
-        // create the starter directory
+        // create the connector directory
         File connectorDir = new File(projectDir, "camel-" + sanitizedName + KAFKA_CONNECTORS_SUFFIX);
         if (!connectorDir.exists() || !connectorDir.isDirectory()) {
             getLog().info("Connector " + name + " can not be updated since directory " + connectorDir.getAbsolutePath() + " dose not exist.");
@@ -208,12 +208,12 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         // create the base pom.xml
         Document pom = createBasePom(connectorDir);
 
-        // Apply changes to the starter pom
+        // Apply changes to the connector pom
         fixExcludedDependencies(pom);
         fixAdditionalDependencies(pom, additionalDependencies);
         fixAdditionalRepositories(pom);
 
-        // Write the starter pom
+        // Write the connector pom
         File pomFile = new File(connectorDir, "pom.xml");
         writeXmlFormatted(pom, pomFile, getLog());
 
@@ -296,7 +296,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
         }
 
         if (!deps.isEmpty()) {
-            getLog().debug("The following dependencies will be added to the starter: " + deps);
+            getLog().debug("The following dependencies will be added to the connector: " + deps);
             MavenUtils.addDependencies(pom, deps, GENERATED_SECTION_START, GENERATED_SECTION_END);
         }
     }
@@ -348,7 +348,7 @@ public class CamelKafkaConnectorUpdateMojo extends AbstractCamelKafkaConnectorMo
                         pom = builder.parse(contentIn);
                     }
 
-                    getLog().debug("Reusing the existing pom.xml for the starter");
+                    getLog().debug("Reusing the existing pom.xml for the connector");
                     return pom;
                 } else {
                     getLog().error("Cannot use the existing pom.xml file since it is not editable. It does not contain " + GENERATED_SECTION_START_COMMENT);
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojo.java
index efbd808..aff55e1 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojo.java
@@ -16,23 +16,31 @@
  */
 package org.apache.camel.kafkaconnector.maven;
 
+import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import io.github.classgraph.ClassGraph;
+import io.github.classgraph.ScanResult;
 import org.apache.camel.catalog.CamelCatalog;
 import org.apache.camel.catalog.DefaultCamelCatalog;
+import org.apache.camel.kafkaconnector.maven.model.KameletModel;
 import org.apache.camel.kafkaconnector.maven.utils.MavenUtils;
-import org.apache.camel.tooling.model.ComponentModel;
-import org.apache.camel.tooling.model.JsonMapper;
+import org.apache.camel.kafkaconnector.maven.utils.YamlKameletMapper;
+import org.apache.camel.kamelets.catalog.KameletsCatalog;
 import org.apache.maven.ProjectDependenciesResolver;
 import org.apache.maven.execution.MavenSession;
 import org.apache.maven.plugin.BuildPluginManager;
@@ -71,9 +79,54 @@ public class GenerateCamelKafkaConnectorsMojo extends AbstractCamelKafkaConnecto
     private static final String GENERATED_SECTION_END = "END OF GENERATED CODE";
     private static final String GENERATED_SECTION_END_COMMENT = "<!--" + GENERATED_SECTION_END + "-->";
 
+    private static final String KAMELETS_DIR = "kamelets";
+    private static final String KAMELETS_FILE_SUFFIX = ".kamelet.yaml";
+
     @Parameter(property = "overridePomFile", required = false, defaultValue = "false")
     protected Boolean overridePomFile;
 
+    //components
+    /**
+     * The initial pom template file.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-template-pom.template")
+    protected String initialPomTemplate;
+
+    /**
+     * Properties file to configure additional dependencies.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-fix-dependencies.properties")
+    protected String fixDependenciesProperties;
+
+    /**
+     * A comma separated list of column separated GAV to include as dependencies
+     * to the generated camel kafka connector. (i.e.
+     * groupId:ArtifactId:version,groupId_2:ArtifactId_2:version_2)
+     */
+    @Parameter(defaultValue = "", readonly = true)
+    protected String additionalComponentDependencies;
+
+    //Kamelets
+    /**
+     * The initial kamelet pom template file.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-kamelet-template-pom.template")
+    protected String initialKameletPomTemplate;
+
+    /**
+     * Properties kamelet file to configure additional dependencies.
+     */
+    @Parameter(defaultValue = "camel-kafka-connector-kamelet-fix-dependencies.properties")
+    protected String fixKameletDependenciesProperties;
+
+    /**
+     * A comma separated list of column separated GAV to include as dependencies
+     * to the generated camel kafka connector. (i.e.
+     * groupId:ArtifactId:version,groupId_2:ArtifactId_2:version_2)
+     */
+    @Parameter(defaultValue = "", readonly = true)
+    private String additionalKameletDependencies;
+
     /**
      * The maven session.
      */
@@ -108,6 +161,91 @@ public class GenerateCamelKafkaConnectorsMojo extends AbstractCamelKafkaConnecto
 
     @Override
     protected void executeAll() throws MojoExecutionException, IOException, ResourceNotFoundException, FileResourceCreationException {
+        // load some project version properties
+        final Properties properties = new Properties();
+        try (InputStream stream = new FileInputStream(rm.getResourceAsFile("project.properties"))) {
+            properties.load(stream);
+        }
+
+        Map<String, String> kameletsResources = new HashMap<>();
+        Set<String> camelComponentsUsedInKamelets = new HashSet<>();
+        List<String> resourceNames;
+        try (ScanResult scanResult = new ClassGraph().acceptPaths("/" + KAMELETS_DIR + "/").scan()) {
+            resourceNames = scanResult.getAllResources().getPaths();
+        }
+        for (String fileName: resourceNames) {
+            String pathInJar = "/" + fileName;
+            String kamelet = new BufferedReader(
+                    new InputStreamReader(KameletsCatalog.class.getResourceAsStream(pathInJar), StandardCharsets.UTF_8))
+                    .lines()
+                    .collect(Collectors.joining("\n"));
+            KameletModel kameletModel = YamlKameletMapper.parseKameletYaml(kamelet);
+
+            // filter all kamelets with type not in {source,sink}
+            if ("source".equals(kameletModel.getType()) || "sink".equals(kameletModel.getType())) {
+                kameletsResources.put(kameletModel.getName(), kamelet);
+                camelComponentsUsedInKamelets.addAll(
+                        kameletModel.getDependencies().stream()
+                                .filter(d -> d.startsWith("camel:"))
+                                .map(d -> d.replaceFirst("camel:", ""))
+                                .collect(Collectors.toSet())
+                );
+            }
+            //TODO: add include (filter) / exclude mechanism
+            getLog().info("Kamelets found to be used to generate/update a kafka connector: " + kameletsResources.keySet());
+        }
+
+        for (String kamelet : kameletsResources.keySet()) {
+            executeMojo(
+                    plugin(
+                            groupId(properties.getProperty("groupId")),
+                            artifactId(properties.getProperty("artifactId")),
+                            version(properties.getProperty("version"))
+                    ),
+                    goal("camel-kafka-connector-kamelet-create"),
+                    configuration(
+                            element(name("name"), kamelet),
+                            element(name("initialKameletPomTemplate"), initialKameletPomTemplate),
+                            element(name("noticeTemplate"), noticeTemplate),
+                            element(name("licenseTemplate"), licenseTemplate),
+                            element(name("fixKameletDependenciesProperties"), fixKameletDependenciesProperties),
+                            element(name("packageFileTemplate"), packageFileTemplate),
+                            element(name("overridePomFile"), overridePomFile.toString()),
+                            element(name("connectorsProjectName"), connectorsProjectName)
+                    ),
+                    executionEnvironment(
+                            project,
+                            session,
+                            pluginManager
+                    )
+            );
+
+            executeMojo(
+                    plugin(
+                            groupId(properties.getProperty("groupId")),
+                            artifactId(properties.getProperty("artifactId")),
+                            version(properties.getProperty("version"))
+                    ),
+                    goal("camel-kafka-connector-kamelet-update"),
+                    configuration(
+                            element(name("additionalDependencies"), additionalComponentDependencies),
+                            element(name("name"), kamelet),
+                            element(name("kameletYaml"), kameletsResources.get(kamelet)),
+                            element(name("initialKameletPomTemplate"), initialKameletPomTemplate),
+                            element(name("noticeTemplate"), noticeTemplate),
+                            element(name("licenseTemplate"), licenseTemplate),
+                            element(name("fixKameletDependenciesProperties"), fixKameletDependenciesProperties),
+                            element(name("packageFileTemplate"), packageFileTemplate),
+                            element(name("connectorsProjectName"), connectorsProjectName)
+                    ),
+                    executionEnvironment(
+                            project,
+                            session,
+                            pluginManager
+                    )
+            );
+        }
+
         CamelCatalog cc = new DefaultCamelCatalog();
         List<String> components;
         List<String> filteredComponents;
@@ -117,31 +255,24 @@ public class GenerateCamelKafkaConnectorsMojo extends AbstractCamelKafkaConnecto
             Set<String> filterComponentNames = new HashSet<>(Arrays.asList(filter.split(",")));
             components = cc.findComponentNames().stream().filter(componentName -> filterComponentNames.contains(componentName)).collect(Collectors.toList());
         }
+        // exclude all components used in a kamelet
+        camelComponentsUsedInKamelets.addAll(excludedComponents);
+        excludedComponents = camelComponentsUsedInKamelets.stream().collect(Collectors.toList());
         if (excludedComponents == null || excludedComponents.isEmpty()) {
             filteredComponents = components;
         } else {
             filteredComponents = components.stream().filter(component -> !excludedComponents.contains(component)).collect(Collectors.toList());
         }
         if (filter != null && !filter.isEmpty()) {
-            getLog().info("Filtered Components that will be generated: " + filter);
+            getLog().info("Filtered Components that will be used to generate a kafka connector: " + filter);
         }
         if (excludedComponents != null && !excludedComponents.isEmpty()) {
-            getLog().info("Excluded Components that won't be generated: " + excludedComponents);
-        }
-        getLog().info("Components found to be generated/updated: " + filteredComponents);
-
-        //TODO: evaluate dataformats to include in each camel kafka connector generated placing them as a comma separated GAV in:
-        String additionalDependencies = "";
-
-        final Properties properties = new Properties();
-
-        try (InputStream stream = new FileInputStream(rm.getResourceAsFile("project.properties"))) {
-            properties.load(stream);
+            getLog().info("Excluded Components that won't be used to generate a kafka connector: " + excludedComponents);
         }
+        getLog().info("Components found to be used to generate/update a kafka connector: " + filteredComponents);
 
         for (String component : filteredComponents) {
             String cJson = cc.componentJSonSchema(component);
-            ComponentModel cm = JsonMapper.generateComponentModel(cJson);
 
             executeMojo(
                     plugin(
@@ -176,7 +307,7 @@ public class GenerateCamelKafkaConnectorsMojo extends AbstractCamelKafkaConnecto
                     ),
                     goal("camel-kafka-connector-update"),
                     configuration(
-                            element(name("additionalDependencies"), additionalDependencies),
+                            element(name("additionalDependencies"), additionalComponentDependencies),
                             element(name("name"), component),
                             element(name("componentJson"), cJson),
                             element(name("initialPomTemplate"), initialPomTemplate),
@@ -197,17 +328,19 @@ public class GenerateCamelKafkaConnectorsMojo extends AbstractCamelKafkaConnecto
         if (removeMissingComponents) {
             if (projectDir != null && projectDir.isDirectory()) {
                 // sanitize names, as there are some camel components with + signal which are sanitized when creating the kafka connector
-                List<String> sanitizedComponentNames = components.stream().map(MavenUtils::sanitizeMavenArtifactId).collect(Collectors.toList());
-                // retrieve the list of camel kafka connectors
-                String[] connectorNames = projectDir.list((dir, filename) -> filename.endsWith(KAFKA_CONNECTORS_SUFFIX));
-                if (connectorNames != null) {
-                    List<String> connectorsToRemove = Stream.of(connectorNames).sorted().filter(filename -> {
+                List<String> sanitizedGeneratedFromComponentsConnectorsNames = filteredComponents.stream().map(MavenUtils::sanitizeMavenArtifactId).collect(Collectors.toList());
+                List<String> sanitizedGeneratedFromKameletsConnectorsNames = kameletsResources.keySet().stream().map(MavenUtils::sanitizeMavenArtifactId).collect(Collectors.toList());
+                // retrieve the list of existing camel kafka connectors
+                String[] existingConnectorNames = projectDir.list((dir, filename) -> filename.endsWith(KAFKA_CONNECTORS_SUFFIX));
+                if (existingConnectorNames != null) {
+                    List<String> connectorsToRemove = Stream.of(existingConnectorNames).sorted().filter(filename -> {
                         String componentName = extractComponentName(filename);
-                        // set to remove connectors that are not in camel catalog or are explicitly excluded
-                        return !sanitizedComponentNames.contains(componentName) || excludedComponents.contains(componentName);
-
+                        // set to remove connectors that are not generated from camel components or a kamelet
+                        return !sanitizedGeneratedFromComponentsConnectorsNames.contains(componentName) && !sanitizedGeneratedFromKameletsConnectorsNames.contains(componentName);
                     }).collect(Collectors.toList());
 
+                    getLog().info("Connectors previously generated found to be removed: " + connectorsToRemove);
+
                     for (String component: connectorsToRemove) {
 
                         executeMojo(
@@ -234,7 +367,9 @@ public class GenerateCamelKafkaConnectorsMojo extends AbstractCamelKafkaConnecto
     }
 
     private String extractComponentName(String connectorName) {
+        // remove starting "camel-"
         String name = connectorName.substring("camel-".length());
+        // remove final KAFKA_CONNECTORS_SUFFIX
         return name.substring(0, name.length() - KAFKA_CONNECTORS_SUFFIX.length());
     }
 }
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletModel.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletModel.java
new file mode 100644
index 0000000..fc08186
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletModel.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.maven.model;
+
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class KameletModel {
+    private String name;
+    private String type;
+    private String description;
+    private Set<String> dependencies;
+    private List<KameletPropertyModel> properties;
+    private Set<String> requiredProperties;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Set<String> getDependencies() {
+        return ImmutableSet.copyOf(dependencies);
+    }
+
+    public void setDependencies(Set<String> dependencies) {
+        this.dependencies = dependencies;
+    }
+
+    public List<KameletPropertyModel> getProperties() {
+        return ImmutableList.copyOf(properties);
+    }
+
+    public void setProperties(List<KameletPropertyModel> properties) {
+        this.properties = properties;
+    }
+
+    public Set<String> getRequiredProperties() {
+        return ImmutableSet.copyOf(requiredProperties);
+    }
+
+    public void setRequiredProperties(Set<String> requiredProperties) {
+        this.requiredProperties = requiredProperties;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String toString() {
+        return "KameletModel{"
+                + "name='" + name + '\''
+                + ", type='" + type + '\''
+                + ", description='" + description + '\''
+                + ", dependencies=" + dependencies
+                + ", properties=" + properties
+                + ", requiredProperties=" + requiredProperties
+                + '}';
+    }
+}
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletPropertyModel.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletPropertyModel.java
new file mode 100644
index 0000000..31e9a5d
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/model/KameletPropertyModel.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.maven.model;
+
+public class KameletPropertyModel {
+    private String name;
+    private String type;
+    private String description;
+    private String example;
+    private String defaultValue;
+    private String format;
+    private String title;
+
+    @Override
+    public String toString() {
+        return "KameletPropertyModel{"
+                + "name='" + name + '\''
+                + ", type='" + type + '\''
+                + ", description='" + description + '\''
+                + ", example='" + example + '\''
+                + ", defaultValue='" + defaultValue + '\''
+                + ", format='" + format + '\''
+                + ", title='" + title + '\''
+                + '}';
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getExample() {
+        return example;
+    }
+
+    public void setExample(String example) {
+        this.example = example;
+    }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(String defaultValue) {
+        this.defaultValue = defaultValue;
+    }
+
+    public String getFormat() {
+        return format;
+    }
+
+    public void setFormat(String format) {
+        this.format = format;
+    }
+
+    public String getTitle() {
+        return title;
+    }
+
+    public void setTitle(String title) {
+        this.title = title;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+}
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapper.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapper.java
new file mode 100644
index 0000000..6cae051
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.maven.utils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
+import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+import org.apache.camel.kafkaconnector.maven.model.KameletModel;
+import org.apache.camel.kafkaconnector.maven.model.KameletPropertyModel;
+
+public final class YamlKameletMapper {
+    public static final ObjectMapper YAML_MAPPER = new YAMLMapper()
+            .configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)
+            .configure(YAMLGenerator.Feature.LITERAL_BLOCK_STYLE, true)
+            .configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true);
+
+    private YamlKameletMapper() { }
+
+    public static KameletModel parseKameletYaml(URL src) throws IOException {
+        ObjectNode kamelet = YAML_MAPPER.readValue(src, ObjectNode.class);
+        return parseKameletYaml(kamelet);
+    }
+
+    public static KameletModel parseKameletYaml(String src) throws JsonProcessingException {
+        ObjectNode kamelet = YAML_MAPPER.readValue(src, ObjectNode.class);
+        return parseKameletYaml(kamelet);
+    }
+
+    public static KameletModel parseKameletYaml(ObjectNode kameletYamlNode) {
+        KameletModel km = new KameletModel();
+        km.setType(kameletYamlNode.at("/metadata/labels/camel.apache.org~1kamelet.type").asText());
+
+        km.setName(kameletYamlNode.at("/metadata/name").asText());
+
+        km.setDescription(kameletYamlNode.at("/spec/definition/description").asText());
+
+        Set<String> requiredProperties = new HashSet<>();
+        kameletYamlNode.at("/spec/definition/required").forEach(req -> requiredProperties.add(req.asText()));
+        km.setRequiredProperties(requiredProperties);
+
+        Set<String> dependencies = new HashSet<>();
+        kameletYamlNode.at("/spec/dependencies").forEach(req -> dependencies.add(req.asText()));
+        km.setDependencies(dependencies);
+
+        List<KameletPropertyModel> kpms = new ArrayList<>();
+        Iterator<Map.Entry<String, JsonNode>> it = kameletYamlNode.at("/spec/definition/properties").fields();
+        while (it.hasNext()) {
+            Map.Entry<String, JsonNode> property = it.next();
+            KameletPropertyModel kpm = new KameletPropertyModel();
+            kpm.setName(property.getKey());
+            JsonNode propertyFields = property.getValue();
+            kpm.setDefaultValue(propertyFields.get("default") != null ? propertyFields.get("default").asText() : null);
+            kpm.setExample(propertyFields.get("example") != null ? propertyFields.get("example").asText() : null);
+            kpm.setDescription(propertyFields.get("description") != null ? propertyFields.get("description").asText() : null);
+            kpm.setFormat(propertyFields.get("format") != null ? propertyFields.get("format").asText() : null);
+            kpm.setTitle(propertyFields.get("title") != null ? propertyFields.get("title").asText() : null);
+            kpm.setType(propertyFields.get("type") != null ? propertyFields.get("type").asText() : null);
+            kpms.add(kpm);
+        }
+        km.setProperties(kpms);
+
+        return km;
+    }
+}
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties
index 973fe74..526c574 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties
@@ -19,11 +19,9 @@
 # Use comma-separated values to include multiple dependencies
 
 # Global dependencies included in all modules (except the ones in the list)
-global=org.apache.camel.kafkaconnector:camel-kafka-connector
-
-camel-couchbase=com.couchbase.client:java-client::runtime
+global=
 
 # we add a default connection factory maven variables can be used as ${varname}
-camel-sjms2=org.apache.activemq:activemq-client::runtime,org.apache.activemq:artemis-jms-client::runtime
-exclude_camel-sjms2=
-additional_properties_camel-sjms2=camel.component.sjms2.connection-factory=#class:org.apache.activemq.ActiveMQConnectionFactory,camel.component.sjms2.connection-factory.brokerURL=tcp://localhost:61616
+#camel-sjms2=org.apache.activemq:activemq-client::runtime,org.apache.activemq:artemis-jms-client::runtime
+#exclude_camel-sjms2=
+#additional_properties_camel-sjms2=camel.component.sjms2.connection-factory=#class:org.apache.activemq.ActiveMQConnectionFactory,camel.component.sjms2.connection-factory.brokerURL=tcp://localhost:61616
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-kamelet-fix-dependencies.properties
similarity index 100%
copy from tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties
copy to tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-kamelet-fix-dependencies.properties
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-kamelet-template-pom.template b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-kamelet-template-pom.template
new file mode 100644
index 0000000..927ffdb
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-kamelet-template-pom.template
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>connectors</artifactId>
+        <version>${version}</version>
+    </parent>
+
+    <artifactId>camel-${componentSanitizedName}-kafka-connector</artifactId>
+    <name>Camel-Kafka-Connector :: ${componentName}</name>
+    <description>Camel Kafka Connector for ${componentDescription}</description>
+
+    <dependencies>
+        <!-- Kafka -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-transforms</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Camel -->
+        <!--START OF GENERATED CODE-->
+        <!--END OF GENERATED CODE-->
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version><#noparse>${version.maven.surefire.plugin}</#noparse></version>
+                <configuration>
+                    <failIfNoTests>false</failIfNoTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version><#noparse>${version.maven.jar}</#noparse></version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+                            <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+                        </manifest>
+                    </archive>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>2.5.1</version>
+                <inherited>true</inherited>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.3.0</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/package.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT.java
index 0c0881e..657cef7 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT.java
@@ -42,29 +42,17 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 class GenerateCamelKafkaConnectorsMojoIT {
 
     @MavenTest
+//    @MavenOption(MavenCLIOptions.DEBUG)
     public void test_generate(MavenExecutionResult result) throws IOException {
         assertThat(result).isSuccessful();
         assertThat(result)
             .out()
             .info()
-            .contains("Excluded Components that won't be generated: "
-                + "[bonita,"
-                + " bean-validator,"
-                + " browse,"
-                + " class,"
-                + " dataset,"
-                + " dataset-test,"
-                + " debezium-mongodb,"
-                + " debezium-mysql,"
-                + " debezium-postgres,"
-                + " debezium-sqlserver,"
-                + " digitalocean,"
-                + " mock,"
-                + " ref,"
-                + " robotframework"
-                + "]")
-            .anyMatch(s -> s.startsWith("Components found to be generated/updated: ["))
+            .anyMatch(s -> s.startsWith("Excluded Components that won't be used to generate a kafka connector: "))
+            .anyMatch(s -> s.startsWith("Components found to be used to generate/update a kafka connector: ["))
+            .anyMatch(s -> s.startsWith("Kamelets found to be used to generate/update a kafka connector: ["))
             .anyMatch(s -> s.startsWith("Creating camel kafka connector for"))
+            .anyMatch(s -> s.startsWith("Creating camel kafka kamelet connector for"))
             .containsSequence(
                 "Creating a new pom.xml for the connector from scratch",
                 "Creating a new package.xml for the connector.")
@@ -92,18 +80,18 @@ class GenerateCamelKafkaConnectorsMojoIT {
 
     private List<String> extractExcluded(List<String> stdout) {
         return stdout.stream()
-            .filter(s -> s.startsWith("[INFO] Excluded Components that won't be generated: ["))
+            .filter(s -> s.startsWith("[INFO] Excluded Components that won't be used to generate a kafka connector: ["))
             .findFirst()
-            .map(s -> Strings.between(s, "[INFO] Excluded Components that won't be generated: [", "]"))
+            .map(s -> Strings.between(s, "[INFO] Excluded Components that won't be used to generate a kafka connector: [", "]"))
             .map(s -> Arrays.asList(s.split(", ")))
             .orElse(Collections.emptyList());
     }
 
     private List<String> extractGenerated(List<String> stdout) {
         return stdout.stream()
-            .filter(s -> s.startsWith("[INFO] Components found to be generated/updated: ["))
+            .filter(s -> s.startsWith("[INFO] Components found to be used to generate/update a kafka connector: ["))
             .findFirst()
-            .map(s -> Strings.between(s, "[INFO] Components found to be generated/updated: [", "]"))
+            .map(s -> Strings.between(s, "[INFO] Components found to be used to generate/update a kafka connector: [", "]"))
             .map(s -> Arrays.asList(s.split(", ")))
             .orElse(Collections.emptyList());
     }
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapperTests.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapperTests.java
new file mode 100644
index 0000000..58761fc
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/java/org/apache/camel/kafkaconnector/maven/utils/YamlKameletMapperTests.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.maven.utils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.camel.kafkaconnector.maven.model.KameletModel;
+import org.apache.camel.kafkaconnector.maven.model.KameletPropertyModel;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class YamlKameletMapperTests {
+    @Test
+    public void parseKameletYamlTest() throws IOException {
+        KameletModel km = YamlKameletMapper.parseKameletYaml(YamlKameletMapperTests.class.getResource("/kamelets/aws-s3-source.kamelet.yaml"));
+
+        assertNotNull(km);
+        assertEquals("aws-s3-source", km.getName());
+        assertEquals("source", km.getType());
+        assertEquals("Receive data from AWS S3.", km.getDescription());
+
+        assertEquals(2, km.getDependencies().size());
+        assertTrue(km.getDependencies().contains("camel:aws2-s3"));
+        assertTrue(km.getDependencies().contains("camel:kamelet"));
+
+        assertEquals(4, km.getRequiredProperties().size());
+        assertTrue(km.getRequiredProperties().contains("bucketNameOrArn"));
+        assertTrue(km.getRequiredProperties().contains("accessKey"));
+        assertTrue(km.getRequiredProperties().contains("secretKey"));
+        assertTrue(km.getRequiredProperties().contains("region"));
+
+        assertEquals(7, km.getProperties().size());
+        List<KameletPropertyModel> regionProperty = km.getProperties().stream().filter(kpm -> "region".equals(kpm.getName())).collect(Collectors.toList());
+        assertEquals(1, regionProperty.size());
+        assertEquals("AWS Region", regionProperty.get(0).getTitle());
+        assertEquals("The AWS region to connect to", regionProperty.get(0).getDescription());
+        assertEquals("string", regionProperty.get(0).getType());
+        assertEquals("eu-west-1", regionProperty.get(0).getExample());
+
+        List<KameletPropertyModel> secretKeyProperty = km.getProperties().stream().filter(kpm -> "secretKey".equals(kpm.getName())).collect(Collectors.toList());
+        assertEquals(1, secretKeyProperty.size());
+        assertEquals("Secret Key", secretKeyProperty.get(0).getTitle());
+        assertEquals("The secret key obtained from AWS", secretKeyProperty.get(0).getDescription());
+        assertEquals("string", secretKeyProperty.get(0).getType());
+        assertEquals("password", secretKeyProperty.get(0).getFormat());
+    }
+
+    @Test
+    public void parseKameletYamlNoPropetiesTest() throws IOException {
+        KameletModel km = YamlKameletMapper.parseKameletYaml(YamlKameletMapperTests.class.getResource("/kamelets/noproperties.kamelet.yaml"));
+        assertNotNull(km);
+        assertTrue(km.getProperties().isEmpty());
+        assertTrue(km.getRequiredProperties().isEmpty());
+    }
+
+    @Test
+    public void parseKameletYamlNoDependenciesTest() throws IOException {
+        KameletModel km = YamlKameletMapper.parseKameletYaml(YamlKameletMapperTests.class.getResource("/kamelets/nodependencies.kamelet.yaml"));
+        assertNotNull(km);
+        assertTrue(km.getDependencies().isEmpty());
+    }
+}
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources-its/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT/test_generate/pom.xml b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources-its/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT/test_generate/pom.xml
index a734091..2a70d13 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources-its/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT/test_generate/pom.xml
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources-its/org/apache/camel/kafkaconnector/maven/GenerateCamelKafkaConnectorsMojoIT/test_generate/pom.xml
@@ -48,6 +48,9 @@
                     <initialPomTemplate>
                         ${project.build.testOutputDirectory}/template-connector-pom.xml
                     </initialPomTemplate>
+                    <initialKameletPomTemplate>
+                        ${project.build.testOutputDirectory}/template-connecotr-kamelet-pom.xml
+                    </initialKameletPomTemplate>
                     <noticeTemplate>
                         ${project.build.testOutputDirectory}/camel-kafka-connector-NOTICE.txt
                     </noticeTemplate>
@@ -57,6 +60,9 @@
                     <fixDependenciesProperties>
                         ${project.build.testOutputDirectory}/camel-kafka-connector-fix-dependencies.properties
                     </fixDependenciesProperties>
+                    <fixKameletDependenciesProperties>
+                        ${project.build.testOutputDirectory}/camel-kafka-connector-kamelet-fix-dependencies.properties
+                    </fixKameletDependenciesProperties>
                     <packageFileTemplate>
                         ${project.build.testOutputDirectory}/template-connector-package.xml
                     </packageFileTemplate>
@@ -98,6 +104,11 @@
                         <artifactId>camel-catalog</artifactId>
                         <version>@camel.version@</version>
                     </dependency>
+                    <dependency>
+                        <groupId>org.apache.camel.kamelets</groupId>
+                        <artifactId>camel-kamelets-catalog</artifactId>
+                        <version>@camel.kamelet.catalog.version@</version>
+                    </dependency>
                 </dependencies>
             </plugin>
         </plugins>
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/camel-kafka-connector-kamelet-fix-dependencies.properties
similarity index 100%
copy from tooling/camel-kafka-connector-generator-maven-plugin/src/main/resources/camel-kafka-connector-fix-dependencies.properties
copy to tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/camel-kafka-connector-kamelet-fix-dependencies.properties
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/aws-s3-source.kamelet.yaml b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/aws-s3-source.kamelet.yaml
new file mode 100644
index 0000000..a1a10f6
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/aws-s3-source.kamelet.yaml
@@ -0,0 +1,83 @@
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: aws-s3-source
+  annotations:
+    camel.apache.org/kamelet.support.level: "Preview"
+    camel.apache.org/catalog.version: "0.3.0"
+    camel.apache.org/kamelet.icon: " [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "AWS S3"
+  labels:
+    camel.apache.org/kamelet.type: "source"
+spec:
+  definition:
+    title: "AWS S3 Source"
+    description: |-
+      Receive data from AWS S3.
+    required:
+      - bucketNameOrArn
+      - accessKey
+      - secretKey
+      - region
+    type: object
+    properties:
+      bucketNameOrArn:
+        title: Bucket Name
+        description: The S3 Bucket name or ARN
+        type: string
+      deleteAfterRead:
+        title: Auto-delete Objects
+        description: Delete objects after consuming them
+        type: boolean
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: true
+      accessKey:
+        title: Access Key
+        description: The access key obtained from AWS
+        type: string
+        format: password
+        x-descriptors:
+        - urn:alm:descriptor:com.tectonic.ui:password
+      secretKey:
+        title: Secret Key
+        description: The secret key obtained from AWS
+        type: string
+        format: password
+        x-descriptors:
+        - urn:alm:descriptor:com.tectonic.ui:password
+      region:
+        title: AWS Region
+        description: The AWS region to connect to
+        type: string
+        example: eu-west-1
+      autoCreateBucket:
+        title: Autocreate Bucket
+        description: Setting the autocreation of the S3 bucket bucketName. 
+        type: boolean
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
+      includeBody:
+        title: Include Body
+        description: If it is true, the exchange will be consumed and put into the body and closed. If false the S3Object stream will be put raw into the body and the headers will be set with the S3 object metadata. 
+        type: boolean
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: true
+  dependencies:
+    - "camel:aws2-s3"
+    - "camel:kamelet"
+  flow:
+    from:
+      uri: "aws2-s3:{{bucketNameOrArn}}"
+      parameters:
+        autoCreateBucket: "{{autoCreateBucket}}"
+        secretKey: "{{secretKey}}"
+        accessKey: "{{accessKey}}"
+        region: "{{region}}"
+        includeBody: "{{includeBody}}"
+        deleteAfterRead: "{{deleteAfterRead}}"
+      steps:
+      - to: "kamelet:sink"
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/nodependencies.kamelet.yaml b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/nodependencies.kamelet.yaml
new file mode 100644
index 0000000..405f804
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/nodependencies.kamelet.yaml
@@ -0,0 +1,80 @@
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: aws-s3-source
+  annotations:
+    camel.apache.org/kamelet.support.level: "Preview"
+    camel.apache.org/catalog.version: "0.3.0"
+    camel.apache.org/kamelet.icon: " [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "AWS S3"
+  labels:
+    camel.apache.org/kamelet.type: "source"
+spec:
+  definition:
+    title: "AWS S3 Source"
+    description: |-
+      Receive data from AWS S3.
+    required:
+      - bucketNameOrArn
+      - accessKey
+      - secretKey
+      - region
+    type: object
+    properties:
+      bucketNameOrArn:
+        title: Bucket Name
+        description: The S3 Bucket name or ARN
+        type: string
+      deleteAfterRead:
+        title: Auto-delete Objects
+        description: Delete objects after consuming them
+        type: boolean
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: true
+      accessKey:
+        title: Access Key
+        description: The access key obtained from AWS
+        type: string
+        format: password
+        x-descriptors:
+        - urn:alm:descriptor:com.tectonic.ui:password
+      secretKey:
+        title: Secret Key
+        description: The secret key obtained from AWS
+        type: string
+        format: password
+        x-descriptors:
+        - urn:alm:descriptor:com.tectonic.ui:password
+      region:
+        title: AWS Region
+        description: The AWS region to connect to
+        type: string
+        example: eu-west-1
+      autoCreateBucket:
+        title: Autocreate Bucket
+        description: Setting the autocreation of the S3 bucket bucketName. 
+        type: boolean
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
+      includeBody:
+        title: Include Body
+        description: If it is true, the exchange will be consumed and put into the body and closed. If false the S3Object stream will be put raw into the body and the headers will be set with the S3 object metadata. 
+        type: boolean
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: true
+  flow:
+    from:
+      uri: "aws2-s3:{{bucketNameOrArn}}"
+      parameters:
+        autoCreateBucket: "{{autoCreateBucket}}"
+        secretKey: "{{secretKey}}"
+        accessKey: "{{accessKey}}"
+        region: "{{region}}"
+        includeBody: "{{includeBody}}"
+        deleteAfterRead: "{{deleteAfterRead}}"
+      steps:
+      - to: "kamelet:sink"
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/noproperties.kamelet.yaml b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/noproperties.kamelet.yaml
new file mode 100644
index 0000000..23f059e
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/kamelets/noproperties.kamelet.yaml
@@ -0,0 +1,33 @@
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: aws-s3-source
+  annotations:
+    camel.apache.org/kamelet.support.level: "Preview"
+    camel.apache.org/catalog.version: "0.3.0"
+    camel.apache.org/kamelet.icon: " [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "AWS S3"
+  labels:
+    camel.apache.org/kamelet.type: "source"
+spec:
+  definition:
+    title: "AWS S3 Source"
+    description: |-
+      Receive data from AWS S3.
+    type: object
+  dependencies:
+    - "camel:aws2-s3"
+    - "camel:kamelet"
+  flow:
+    from:
+      uri: "aws2-s3:{{bucketNameOrArn}}"
+      parameters:
+        autoCreateBucket: "{{autoCreateBucket}}"
+        secretKey: "{{secretKey}}"
+        accessKey: "{{accessKey}}"
+        region: "{{region}}"
+        includeBody: "{{includeBody}}"
+        deleteAfterRead: "{{deleteAfterRead}}"
+      steps:
+      - to: "kamelet:sink"
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/template-connecotr-kamelet-pom.xml b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/template-connecotr-kamelet-pom.xml
new file mode 100644
index 0000000..927ffdb
--- /dev/null
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/test/resources/template-connecotr-kamelet-pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>connectors</artifactId>
+        <version>${version}</version>
+    </parent>
+
+    <artifactId>camel-${componentSanitizedName}-kafka-connector</artifactId>
+    <name>Camel-Kafka-Connector :: ${componentName}</name>
+    <description>Camel Kafka Connector for ${componentDescription}</description>
+
+    <dependencies>
+        <!-- Kafka -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-transforms</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Camel -->
+        <!--START OF GENERATED CODE-->
+        <!--END OF GENERATED CODE-->
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version><#noparse>${version.maven.surefire.plugin}</#noparse></version>
+                <configuration>
+                    <failIfNoTests>false</failIfNoTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version><#noparse>${version.maven.jar}</#noparse></version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+                            <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+                        </manifest>
+                    </archive>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>2.5.1</version>
+                <inherited>true</inherited>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.3.0</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/package.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/tooling/pom.xml b/tooling/pom.xml
index a6e6f3f..630763c 100644
--- a/tooling/pom.xml
+++ b/tooling/pom.xml
@@ -51,6 +51,7 @@
         <jandex-version>2.1.1.Final</jandex-version>
         <mvel-version>2.4.12.Final</mvel-version>
         <roaster-version>2.20.1.Final</roaster-version>
+        <classgraph.version>4.8.110</classgraph.version>
     </properties>
 
     <build>