You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/04/25 23:40:05 UTC

[camel-kafka-connector] branch kamelets created (now 5df87a0)

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a change to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 5df87a0  chore: regen.

This branch includes the following new commits:

     new ae61051  Related to #423 : caonverted source and sink to use camel-kamelets.
     new 5df87a0  chore: regen.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[camel-kafka-connector] 02/02: chore: regen.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 5df87a0628a7d1ad801507b125ff1fe1a6f971f7
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Apr 26 01:38:56 2021 +0200

    chore: regen.
---
 .../src/generated/resources/connectors/camel-cql-sink.json       | 4 ++--
 .../src/generated/resources/connectors/camel-cql-source.json     | 4 ++--
 .../src/generated/resources/connectors/camel-solr-sink.json      | 7 +++++++
 .../src/generated/resources/connectors/camel-solrCloud-sink.json | 7 +++++++
 .../src/generated/resources/connectors/camel-solrs-sink.json     | 7 +++++++
 .../resources/connectors/camel-twitter-timeline-sink.json        | 9 ++++++++-
 .../resources/connectors/camel-twitter-timeline-source.json      | 9 ++++++++-
 .../src/generated/resources/camel-cql-sink.json                  | 4 ++--
 .../src/generated/resources/camel-cql-source.json                | 4 ++--
 .../src/main/docs/camel-cql-kafka-sink-connector.adoc            | 4 ++--
 .../src/main/docs/camel-cql-kafka-source-connector.adoc          | 4 ++--
 .../camel/kafkaconnector/cql/CamelCqlSinkConnectorConfig.java    | 4 ++--
 .../camel/kafkaconnector/cql/CamelCqlSourceConnectorConfig.java  | 4 ++--
 .../src/generated/resources/camel-solr-sink.json                 | 7 +++++++
 .../src/main/docs/camel-solr-kafka-sink-connector.adoc           | 3 ++-
 .../camel/kafkaconnector/solr/CamelSolrSinkConnectorConfig.java  | 4 ++++
 .../src/generated/resources/camel-solrCloud-sink.json            | 7 +++++++
 .../src/main/docs/camel-solrCloud-kafka-sink-connector.adoc      | 3 ++-
 .../solrcloud/CamelSolrcloudSinkConnectorConfig.java             | 4 ++++
 .../src/generated/resources/camel-solrs-sink.json                | 7 +++++++
 .../src/main/docs/camel-solrs-kafka-sink-connector.adoc          | 3 ++-
 .../kafkaconnector/solrs/CamelSolrsSinkConnectorConfig.java      | 4 ++++
 .../src/generated/resources/camel-twitter-timeline-sink.json     | 9 ++++++++-
 .../src/generated/resources/camel-twitter-timeline-source.json   | 9 ++++++++-
 .../main/docs/camel-twitter-timeline-kafka-sink-connector.adoc   | 5 +++--
 .../main/docs/camel-twitter-timeline-kafka-source-connector.adoc | 5 +++--
 .../docs/examples/CamelTwittertimelineSinkConnector.properties   | 2 +-
 .../docs/examples/CamelTwittertimelineSourceConnector.properties | 2 +-
 .../twittertimeline/CamelTwittertimelineSinkConnectorConfig.java | 6 +++++-
 .../CamelTwittertimelineSourceConnectorConfig.java               | 6 +++++-
 .../ROOT/pages/connectors/camel-cql-kafka-sink-connector.adoc    | 4 ++--
 .../ROOT/pages/connectors/camel-cql-kafka-source-connector.adoc  | 4 ++--
 .../ROOT/pages/connectors/camel-solr-kafka-sink-connector.adoc   | 3 ++-
 .../pages/connectors/camel-solrCloud-kafka-sink-connector.adoc   | 3 ++-
 .../ROOT/pages/connectors/camel-solrs-kafka-sink-connector.adoc  | 3 ++-
 .../connectors/camel-twitter-timeline-kafka-sink-connector.adoc  | 5 +++--
 .../camel-twitter-timeline-kafka-source-connector.adoc           | 5 +++--
 37 files changed, 142 insertions(+), 42 deletions(-)

diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-sink.json
index c16e57a..e54b6b5 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-sink.json
@@ -17,13 +17,13 @@
 		},
 		"camel.sink.path.hosts": {
 			"name": "camel.sink.path.hosts",
-			"description": "Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma.",
+			"description": "Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma.",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
 		"camel.sink.path.port": {
 			"name": "camel.sink.path.port",
-			"description": "Port number of cassansdra server(s)",
+			"description": "Port number of Cassandra server(s)",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-source.json
index e2276f1..dc6bf84 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-cql-source.json
@@ -17,13 +17,13 @@
 		},
 		"camel.source.path.hosts": {
 			"name": "camel.source.path.hosts",
-			"description": "Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma.",
+			"description": "Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma.",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
 		"camel.source.path.port": {
 			"name": "camel.source.path.port",
-			"description": "Port number of cassansdra server(s)",
+			"description": "Port number of Cassandra server(s)",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solr-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solr-sink.json
index faeaca3..9369114 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solr-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solr-sink.json
@@ -21,6 +21,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.autoCommit": {
+			"name": "camel.sink.endpoint.autoCommit",
+			"description": "If true, each producer operation will be committed automatically",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.connectionTimeout": {
 			"name": "camel.sink.endpoint.connectionTimeout",
 			"description": "connectionTimeout on the underlying HttpConnectionManager",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrCloud-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrCloud-sink.json
index 551bd64..508c2ec 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrCloud-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrCloud-sink.json
@@ -21,6 +21,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.autoCommit": {
+			"name": "camel.sink.endpoint.autoCommit",
+			"description": "If true, each producer operation will be committed automatically",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.connectionTimeout": {
 			"name": "camel.sink.endpoint.connectionTimeout",
 			"description": "connectionTimeout on the underlying HttpConnectionManager",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrs-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrs-sink.json
index f15e4c2..93ad59e 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrs-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-solrs-sink.json
@@ -21,6 +21,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.autoCommit": {
+			"name": "camel.sink.endpoint.autoCommit",
+			"description": "If true, each producer operation will be committed automatically",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.connectionTimeout": {
 			"name": "camel.sink.endpoint.connectionTimeout",
 			"description": "connectionTimeout on the underlying HttpConnectionManager",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-sink.json
index f3b6c2f..f63a946 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-sink.json
@@ -11,7 +11,7 @@
 	"properties": {
 		"camel.sink.path.timelineType": {
 			"name": "camel.sink.path.timelineType",
-			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]",
+			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]",
 			"priority": "HIGH",
 			"required": "true",
 			"enum": [
@@ -20,9 +20,16 @@
 				"USER",
 				"MENTIONS",
 				"RETWEETSOFME",
+				"LIST",
 				"UNKNOWN"
 			]
 		},
+		"camel.sink.endpoint.list": {
+			"name": "camel.sink.endpoint.list",
+			"description": "The list name when using timelineType=list",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.user": {
 			"name": "camel.sink.endpoint.user",
 			"description": "The username when using timelineType=user",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-source.json
index f2464ea..db78afa 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-twitter-timeline-source.json
@@ -11,7 +11,7 @@
 	"properties": {
 		"camel.source.path.timelineType": {
 			"name": "camel.source.path.timelineType",
-			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]",
+			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]",
 			"priority": "HIGH",
 			"required": "true",
 			"enum": [
@@ -20,9 +20,16 @@
 				"USER",
 				"MENTIONS",
 				"RETWEETSOFME",
+				"LIST",
 				"UNKNOWN"
 			]
 		},
+		"camel.source.endpoint.list": {
+			"name": "camel.source.endpoint.list",
+			"description": "The list name when using timelineType=list",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.user": {
 			"name": "camel.source.endpoint.user",
 			"description": "The username when using timelineType=user",
diff --git a/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-sink.json b/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-sink.json
index c16e57a..e54b6b5 100644
--- a/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-sink.json
+++ b/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-sink.json
@@ -17,13 +17,13 @@
 		},
 		"camel.sink.path.hosts": {
 			"name": "camel.sink.path.hosts",
-			"description": "Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma.",
+			"description": "Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma.",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
 		"camel.sink.path.port": {
 			"name": "camel.sink.path.port",
-			"description": "Port number of cassansdra server(s)",
+			"description": "Port number of Cassandra server(s)",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-source.json b/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-source.json
index e2276f1..dc6bf84 100644
--- a/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-source.json
+++ b/connectors/camel-cql-kafka-connector/src/generated/resources/camel-cql-source.json
@@ -17,13 +17,13 @@
 		},
 		"camel.source.path.hosts": {
 			"name": "camel.source.path.hosts",
-			"description": "Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma.",
+			"description": "Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma.",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
 		"camel.source.path.port": {
 			"name": "camel.source.path.port",
-			"description": "Port number of cassansdra server(s)",
+			"description": "Port number of Cassandra server(s)",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-sink-connector.adoc b/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-sink-connector.adoc
index f7ec7a5..016d380 100644
--- a/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-sink-connector.adoc
+++ b/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-sink-connector.adoc
@@ -32,8 +32,8 @@ The camel-cql sink connector supports 17 options, which are listed below.
 |===
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.beanRef* | beanRef is defined using bean:id | null | false | MEDIUM
-| *camel.sink.path.hosts* | Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
-| *camel.sink.path.port* | Port number of cassansdra server(s) | null | false | MEDIUM
+| *camel.sink.path.hosts* | Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
+| *camel.sink.path.port* | Port number of Cassandra server(s) | null | false | MEDIUM
 | *camel.sink.path.keyspace* | Keyspace to use | null | false | MEDIUM
 | *camel.sink.endpoint.clusterName* | Cluster name | null | false | MEDIUM
 | *camel.sink.endpoint.consistencyLevel* | Consistency level to use One of: [ANY] [ONE] [TWO] [THREE] [QUORUM] [ALL] [LOCAL_ONE] [LOCAL_QUORUM] [EACH_QUORUM] [SERIAL] [LOCAL_SERIAL] | null | false | MEDIUM
diff --git a/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-source-connector.adoc b/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-source-connector.adoc
index fbf7210..23ae1a6 100644
--- a/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-source-connector.adoc
+++ b/connectors/camel-cql-kafka-connector/src/main/docs/camel-cql-kafka-source-connector.adoc
@@ -32,8 +32,8 @@ The camel-cql source connector supports 35 options, which are listed below.
 |===
 | Name | Description | Default | Required | Priority
 | *camel.source.path.beanRef* | beanRef is defined using bean:id | null | false | MEDIUM
-| *camel.source.path.hosts* | Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
-| *camel.source.path.port* | Port number of cassansdra server(s) | null | false | MEDIUM
+| *camel.source.path.hosts* | Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
+| *camel.source.path.port* | Port number of Cassandra server(s) | null | false | MEDIUM
 | *camel.source.path.keyspace* | Keyspace to use | null | false | MEDIUM
 | *camel.source.endpoint.clusterName* | Cluster name | null | false | MEDIUM
 | *camel.source.endpoint.consistencyLevel* | Consistency level to use One of: [ANY] [ONE] [TWO] [THREE] [QUORUM] [ALL] [LOCAL_ONE] [LOCAL_QUORUM] [EACH_QUORUM] [SERIAL] [LOCAL_SERIAL] | null | false | MEDIUM
diff --git a/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSinkConnectorConfig.java b/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSinkConnectorConfig.java
index be1eca6..eb331a9 100644
--- a/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSinkConnectorConfig.java
+++ b/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSinkConnectorConfig.java
@@ -28,10 +28,10 @@ public class CamelCqlSinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_CQL_PATH_BEAN_REF_DOC = "beanRef is defined using bean:id";
     public static final String CAMEL_SINK_CQL_PATH_BEAN_REF_DEFAULT = null;
     public static final String CAMEL_SINK_CQL_PATH_HOSTS_CONF = "camel.sink.path.hosts";
-    public static final String CAMEL_SINK_CQL_PATH_HOSTS_DOC = "Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma.";
+    public static final String CAMEL_SINK_CQL_PATH_HOSTS_DOC = "Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma.";
     public static final String CAMEL_SINK_CQL_PATH_HOSTS_DEFAULT = null;
     public static final String CAMEL_SINK_CQL_PATH_PORT_CONF = "camel.sink.path.port";
-    public static final String CAMEL_SINK_CQL_PATH_PORT_DOC = "Port number of cassansdra server(s)";
+    public static final String CAMEL_SINK_CQL_PATH_PORT_DOC = "Port number of Cassandra server(s)";
     public static final String CAMEL_SINK_CQL_PATH_PORT_DEFAULT = null;
     public static final String CAMEL_SINK_CQL_PATH_KEYSPACE_CONF = "camel.sink.path.keyspace";
     public static final String CAMEL_SINK_CQL_PATH_KEYSPACE_DOC = "Keyspace to use";
diff --git a/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSourceConnectorConfig.java b/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSourceConnectorConfig.java
index 71c3cf5..b70aea2 100644
--- a/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSourceConnectorConfig.java
+++ b/connectors/camel-cql-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/cql/CamelCqlSourceConnectorConfig.java
@@ -28,10 +28,10 @@ public class CamelCqlSourceConnectorConfig extends CamelSourceConnectorConfig {
     public static final String CAMEL_SOURCE_CQL_PATH_BEAN_REF_DOC = "beanRef is defined using bean:id";
     public static final String CAMEL_SOURCE_CQL_PATH_BEAN_REF_DEFAULT = null;
     public static final String CAMEL_SOURCE_CQL_PATH_HOSTS_CONF = "camel.source.path.hosts";
-    public static final String CAMEL_SOURCE_CQL_PATH_HOSTS_DOC = "Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma.";
+    public static final String CAMEL_SOURCE_CQL_PATH_HOSTS_DOC = "Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma.";
     public static final String CAMEL_SOURCE_CQL_PATH_HOSTS_DEFAULT = null;
     public static final String CAMEL_SOURCE_CQL_PATH_PORT_CONF = "camel.source.path.port";
-    public static final String CAMEL_SOURCE_CQL_PATH_PORT_DOC = "Port number of cassansdra server(s)";
+    public static final String CAMEL_SOURCE_CQL_PATH_PORT_DOC = "Port number of Cassandra server(s)";
     public static final String CAMEL_SOURCE_CQL_PATH_PORT_DEFAULT = null;
     public static final String CAMEL_SOURCE_CQL_PATH_KEYSPACE_CONF = "camel.source.path.keyspace";
     public static final String CAMEL_SOURCE_CQL_PATH_KEYSPACE_DOC = "Keyspace to use";
diff --git a/connectors/camel-solr-kafka-connector/src/generated/resources/camel-solr-sink.json b/connectors/camel-solr-kafka-connector/src/generated/resources/camel-solr-sink.json
index faeaca3..9369114 100644
--- a/connectors/camel-solr-kafka-connector/src/generated/resources/camel-solr-sink.json
+++ b/connectors/camel-solr-kafka-connector/src/generated/resources/camel-solr-sink.json
@@ -21,6 +21,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.autoCommit": {
+			"name": "camel.sink.endpoint.autoCommit",
+			"description": "If true, each producer operation will be committed automatically",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.connectionTimeout": {
 			"name": "camel.sink.endpoint.connectionTimeout",
 			"description": "connectionTimeout on the underlying HttpConnectionManager",
diff --git a/connectors/camel-solr-kafka-connector/src/main/docs/camel-solr-kafka-sink-connector.adoc b/connectors/camel-solr-kafka-connector/src/main/docs/camel-solr-kafka-sink-connector.adoc
index 7d95c36..3f2aff1 100644
--- a/connectors/camel-solr-kafka-connector/src/main/docs/camel-solr-kafka-sink-connector.adoc
+++ b/connectors/camel-solr-kafka-connector/src/main/docs/camel-solr-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.solr.CamelSolrSinkConnector
 ----
 
 
-The camel-solr sink connector supports 18 options, which are listed below.
+The camel-solr sink connector supports 19 options, which are listed below.
 
 
 
@@ -33,6 +33,7 @@ The camel-solr sink connector supports 18 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.url* | Hostname and port for the solr server | null | true | HIGH
 | *camel.sink.endpoint.allowCompression* | Server side must support gzip or deflate for this to have any effect | null | false | MEDIUM
+| *camel.sink.endpoint.autoCommit* | If true, each producer operation will be committed automatically | false | false | MEDIUM
 | *camel.sink.endpoint.connectionTimeout* | connectionTimeout on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.defaultMaxConnectionsPerHost* | maxConnectionsPerHost on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.followRedirects* | indicates whether redirects are used to get to the Solr server | null | false | MEDIUM
diff --git a/connectors/camel-solr-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solr/CamelSolrSinkConnectorConfig.java b/connectors/camel-solr-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solr/CamelSolrSinkConnectorConfig.java
index 6951938..e8275ca 100644
--- a/connectors/camel-solr-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solr/CamelSolrSinkConnectorConfig.java
+++ b/connectors/camel-solr-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solr/CamelSolrSinkConnectorConfig.java
@@ -30,6 +30,9 @@ public class CamelSolrSinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_SOLR_ENDPOINT_ALLOW_COMPRESSION_CONF = "camel.sink.endpoint.allowCompression";
     public static final String CAMEL_SINK_SOLR_ENDPOINT_ALLOW_COMPRESSION_DOC = "Server side must support gzip or deflate for this to have any effect";
     public static final String CAMEL_SINK_SOLR_ENDPOINT_ALLOW_COMPRESSION_DEFAULT = null;
+    public static final String CAMEL_SINK_SOLR_ENDPOINT_AUTO_COMMIT_CONF = "camel.sink.endpoint.autoCommit";
+    public static final String CAMEL_SINK_SOLR_ENDPOINT_AUTO_COMMIT_DOC = "If true, each producer operation will be committed automatically";
+    public static final Boolean CAMEL_SINK_SOLR_ENDPOINT_AUTO_COMMIT_DEFAULT = false;
     public static final String CAMEL_SINK_SOLR_ENDPOINT_CONNECTION_TIMEOUT_CONF = "camel.sink.endpoint.connectionTimeout";
     public static final String CAMEL_SINK_SOLR_ENDPOINT_CONNECTION_TIMEOUT_DOC = "connectionTimeout on the underlying HttpConnectionManager";
     public static final String CAMEL_SINK_SOLR_ENDPOINT_CONNECTION_TIMEOUT_DEFAULT = null;
@@ -93,6 +96,7 @@ public class CamelSolrSinkConnectorConfig extends CamelSinkConnectorConfig {
         ConfigDef conf = new ConfigDef(CamelSinkConnectorConfig.conf());
         conf.define(CAMEL_SINK_SOLR_PATH_URL_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLR_PATH_URL_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SINK_SOLR_PATH_URL_DOC);
         conf.define(CAMEL_SINK_SOLR_ENDPOINT_ALLOW_COMPRESSION_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLR_ENDPOINT_ALLOW_COMPRESSION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLR_ENDPOINT_ALLOW_COMPRESSION_DOC);
+        conf.define(CAMEL_SINK_SOLR_ENDPOINT_AUTO_COMMIT_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_SOLR_ENDPOINT_AUTO_COMMIT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLR_ENDPOINT_AUTO_COMMIT_DOC);
         conf.define(CAMEL_SINK_SOLR_ENDPOINT_CONNECTION_TIMEOUT_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLR_ENDPOINT_CONNECTION_TIMEOUT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLR_ENDPOINT_CONNECTION_TIMEOUT_DOC);
         conf.define(CAMEL_SINK_SOLR_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLR_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLR_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_DOC);
         conf.define(CAMEL_SINK_SOLR_ENDPOINT_FOLLOW_REDIRECTS_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLR_ENDPOINT_FOLLOW_REDIRECTS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLR_ENDPOINT_FOLLOW_REDIRECTS_DOC);
diff --git a/connectors/camel-solrcloud-kafka-connector/src/generated/resources/camel-solrCloud-sink.json b/connectors/camel-solrcloud-kafka-connector/src/generated/resources/camel-solrCloud-sink.json
index 551bd64..508c2ec 100644
--- a/connectors/camel-solrcloud-kafka-connector/src/generated/resources/camel-solrCloud-sink.json
+++ b/connectors/camel-solrcloud-kafka-connector/src/generated/resources/camel-solrCloud-sink.json
@@ -21,6 +21,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.autoCommit": {
+			"name": "camel.sink.endpoint.autoCommit",
+			"description": "If true, each producer operation will be committed automatically",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.connectionTimeout": {
 			"name": "camel.sink.endpoint.connectionTimeout",
 			"description": "connectionTimeout on the underlying HttpConnectionManager",
diff --git a/connectors/camel-solrcloud-kafka-connector/src/main/docs/camel-solrCloud-kafka-sink-connector.adoc b/connectors/camel-solrcloud-kafka-connector/src/main/docs/camel-solrCloud-kafka-sink-connector.adoc
index 765b2a7..9a3849b 100644
--- a/connectors/camel-solrcloud-kafka-connector/src/main/docs/camel-solrCloud-kafka-sink-connector.adoc
+++ b/connectors/camel-solrcloud-kafka-connector/src/main/docs/camel-solrCloud-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.solrcloud.CamelSolrcloudSinkConn
 ----
 
 
-The camel-solrCloud sink connector supports 18 options, which are listed below.
+The camel-solrCloud sink connector supports 19 options, which are listed below.
 
 
 
@@ -33,6 +33,7 @@ The camel-solrCloud sink connector supports 18 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.url* | Hostname and port for the solr server | null | true | HIGH
 | *camel.sink.endpoint.allowCompression* | Server side must support gzip or deflate for this to have any effect | null | false | MEDIUM
+| *camel.sink.endpoint.autoCommit* | If true, each producer operation will be committed automatically | false | false | MEDIUM
 | *camel.sink.endpoint.connectionTimeout* | connectionTimeout on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.defaultMaxConnectionsPerHost* | maxConnectionsPerHost on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.followRedirects* | indicates whether redirects are used to get to the Solr server | null | false | MEDIUM
diff --git a/connectors/camel-solrcloud-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrcloud/CamelSolrcloudSinkConnectorConfig.java b/connectors/camel-solrcloud-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrcloud/CamelSolrcloudSinkConnectorConfig.java
index 34b3f78..c76b8be 100644
--- a/connectors/camel-solrcloud-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrcloud/CamelSolrcloudSinkConnectorConfig.java
+++ b/connectors/camel-solrcloud-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrcloud/CamelSolrcloudSinkConnectorConfig.java
@@ -32,6 +32,9 @@ public class CamelSolrcloudSinkConnectorConfig
     public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_ALLOW_COMPRESSION_CONF = "camel.sink.endpoint.allowCompression";
     public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_ALLOW_COMPRESSION_DOC = "Server side must support gzip or deflate for this to have any effect";
     public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_ALLOW_COMPRESSION_DEFAULT = null;
+    public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_AUTO_COMMIT_CONF = "camel.sink.endpoint.autoCommit";
+    public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_AUTO_COMMIT_DOC = "If true, each producer operation will be committed automatically";
+    public static final Boolean CAMEL_SINK_SOLRCLOUD_ENDPOINT_AUTO_COMMIT_DEFAULT = false;
     public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_CONNECTION_TIMEOUT_CONF = "camel.sink.endpoint.connectionTimeout";
     public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_CONNECTION_TIMEOUT_DOC = "connectionTimeout on the underlying HttpConnectionManager";
     public static final String CAMEL_SINK_SOLRCLOUD_ENDPOINT_CONNECTION_TIMEOUT_DEFAULT = null;
@@ -95,6 +98,7 @@ public class CamelSolrcloudSinkConnectorConfig
         ConfigDef conf = new ConfigDef(CamelSinkConnectorConfig.conf());
         conf.define(CAMEL_SINK_SOLRCLOUD_PATH_URL_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRCLOUD_PATH_URL_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SINK_SOLRCLOUD_PATH_URL_DOC);
         conf.define(CAMEL_SINK_SOLRCLOUD_ENDPOINT_ALLOW_COMPRESSION_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRCLOUD_ENDPOINT_ALLOW_COMPRESSION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRCLOUD_ENDPOINT_ALLOW_COMPRESSION_DOC);
+        conf.define(CAMEL_SINK_SOLRCLOUD_ENDPOINT_AUTO_COMMIT_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_SOLRCLOUD_ENDPOINT_AUTO_COMMIT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRCLOUD_ENDPOINT_AUTO_COMMIT_DOC);
         conf.define(CAMEL_SINK_SOLRCLOUD_ENDPOINT_CONNECTION_TIMEOUT_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRCLOUD_ENDPOINT_CONNECTION_TIMEOUT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRCLOUD_ENDPOINT_CONNECTION_TIMEOUT_DOC);
         conf.define(CAMEL_SINK_SOLRCLOUD_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRCLOUD_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRCLOUD_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_DOC);
         conf.define(CAMEL_SINK_SOLRCLOUD_ENDPOINT_FOLLOW_REDIRECTS_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRCLOUD_ENDPOINT_FOLLOW_REDIRECTS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRCLOUD_ENDPOINT_FOLLOW_REDIRECTS_DOC);
diff --git a/connectors/camel-solrs-kafka-connector/src/generated/resources/camel-solrs-sink.json b/connectors/camel-solrs-kafka-connector/src/generated/resources/camel-solrs-sink.json
index f15e4c2..93ad59e 100644
--- a/connectors/camel-solrs-kafka-connector/src/generated/resources/camel-solrs-sink.json
+++ b/connectors/camel-solrs-kafka-connector/src/generated/resources/camel-solrs-sink.json
@@ -21,6 +21,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.autoCommit": {
+			"name": "camel.sink.endpoint.autoCommit",
+			"description": "If true, each producer operation will be committed automatically",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.connectionTimeout": {
 			"name": "camel.sink.endpoint.connectionTimeout",
 			"description": "connectionTimeout on the underlying HttpConnectionManager",
diff --git a/connectors/camel-solrs-kafka-connector/src/main/docs/camel-solrs-kafka-sink-connector.adoc b/connectors/camel-solrs-kafka-connector/src/main/docs/camel-solrs-kafka-sink-connector.adoc
index 2fdc077..659a831 100644
--- a/connectors/camel-solrs-kafka-connector/src/main/docs/camel-solrs-kafka-sink-connector.adoc
+++ b/connectors/camel-solrs-kafka-connector/src/main/docs/camel-solrs-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.solrs.CamelSolrsSinkConnector
 ----
 
 
-The camel-solrs sink connector supports 18 options, which are listed below.
+The camel-solrs sink connector supports 19 options, which are listed below.
 
 
 
@@ -33,6 +33,7 @@ The camel-solrs sink connector supports 18 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.url* | Hostname and port for the solr server | null | true | HIGH
 | *camel.sink.endpoint.allowCompression* | Server side must support gzip or deflate for this to have any effect | null | false | MEDIUM
+| *camel.sink.endpoint.autoCommit* | If true, each producer operation will be committed automatically | false | false | MEDIUM
 | *camel.sink.endpoint.connectionTimeout* | connectionTimeout on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.defaultMaxConnectionsPerHost* | maxConnectionsPerHost on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.followRedirects* | indicates whether redirects are used to get to the Solr server | null | false | MEDIUM
diff --git a/connectors/camel-solrs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrs/CamelSolrsSinkConnectorConfig.java b/connectors/camel-solrs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrs/CamelSolrsSinkConnectorConfig.java
index bf2653b..0e82d27 100644
--- a/connectors/camel-solrs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrs/CamelSolrsSinkConnectorConfig.java
+++ b/connectors/camel-solrs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/solrs/CamelSolrsSinkConnectorConfig.java
@@ -30,6 +30,9 @@ public class CamelSolrsSinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_SOLRS_ENDPOINT_ALLOW_COMPRESSION_CONF = "camel.sink.endpoint.allowCompression";
     public static final String CAMEL_SINK_SOLRS_ENDPOINT_ALLOW_COMPRESSION_DOC = "Server side must support gzip or deflate for this to have any effect";
     public static final String CAMEL_SINK_SOLRS_ENDPOINT_ALLOW_COMPRESSION_DEFAULT = null;
+    public static final String CAMEL_SINK_SOLRS_ENDPOINT_AUTO_COMMIT_CONF = "camel.sink.endpoint.autoCommit";
+    public static final String CAMEL_SINK_SOLRS_ENDPOINT_AUTO_COMMIT_DOC = "If true, each producer operation will be committed automatically";
+    public static final Boolean CAMEL_SINK_SOLRS_ENDPOINT_AUTO_COMMIT_DEFAULT = false;
     public static final String CAMEL_SINK_SOLRS_ENDPOINT_CONNECTION_TIMEOUT_CONF = "camel.sink.endpoint.connectionTimeout";
     public static final String CAMEL_SINK_SOLRS_ENDPOINT_CONNECTION_TIMEOUT_DOC = "connectionTimeout on the underlying HttpConnectionManager";
     public static final String CAMEL_SINK_SOLRS_ENDPOINT_CONNECTION_TIMEOUT_DEFAULT = null;
@@ -93,6 +96,7 @@ public class CamelSolrsSinkConnectorConfig extends CamelSinkConnectorConfig {
         ConfigDef conf = new ConfigDef(CamelSinkConnectorConfig.conf());
         conf.define(CAMEL_SINK_SOLRS_PATH_URL_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRS_PATH_URL_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SINK_SOLRS_PATH_URL_DOC);
         conf.define(CAMEL_SINK_SOLRS_ENDPOINT_ALLOW_COMPRESSION_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRS_ENDPOINT_ALLOW_COMPRESSION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRS_ENDPOINT_ALLOW_COMPRESSION_DOC);
+        conf.define(CAMEL_SINK_SOLRS_ENDPOINT_AUTO_COMMIT_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_SOLRS_ENDPOINT_AUTO_COMMIT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRS_ENDPOINT_AUTO_COMMIT_DOC);
         conf.define(CAMEL_SINK_SOLRS_ENDPOINT_CONNECTION_TIMEOUT_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRS_ENDPOINT_CONNECTION_TIMEOUT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRS_ENDPOINT_CONNECTION_TIMEOUT_DOC);
         conf.define(CAMEL_SINK_SOLRS_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRS_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRS_ENDPOINT_DEFAULT_MAX_CONNECTIONS_PER_HOST_DOC);
         conf.define(CAMEL_SINK_SOLRS_ENDPOINT_FOLLOW_REDIRECTS_CONF, ConfigDef.Type.STRING, CAMEL_SINK_SOLRS_ENDPOINT_FOLLOW_REDIRECTS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_SOLRS_ENDPOINT_FOLLOW_REDIRECTS_DOC);
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-sink.json b/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-sink.json
index f3b6c2f..f63a946 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-sink.json
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-sink.json
@@ -11,7 +11,7 @@
 	"properties": {
 		"camel.sink.path.timelineType": {
 			"name": "camel.sink.path.timelineType",
-			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]",
+			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]",
 			"priority": "HIGH",
 			"required": "true",
 			"enum": [
@@ -20,9 +20,16 @@
 				"USER",
 				"MENTIONS",
 				"RETWEETSOFME",
+				"LIST",
 				"UNKNOWN"
 			]
 		},
+		"camel.sink.endpoint.list": {
+			"name": "camel.sink.endpoint.list",
+			"description": "The list name when using timelineType=list",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.user": {
 			"name": "camel.sink.endpoint.user",
 			"description": "The username when using timelineType=user",
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-source.json b/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-source.json
index f2464ea..db78afa 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-source.json
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/generated/resources/camel-twitter-timeline-source.json
@@ -11,7 +11,7 @@
 	"properties": {
 		"camel.source.path.timelineType": {
 			"name": "camel.source.path.timelineType",
-			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]",
+			"description": "The timeline type to produce\/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]",
 			"priority": "HIGH",
 			"required": "true",
 			"enum": [
@@ -20,9 +20,16 @@
 				"USER",
 				"MENTIONS",
 				"RETWEETSOFME",
+				"LIST",
 				"UNKNOWN"
 			]
 		},
+		"camel.source.endpoint.list": {
+			"name": "camel.source.endpoint.list",
+			"description": "The list name when using timelineType=list",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.user": {
 			"name": "camel.source.endpoint.user",
 			"description": "The username when using timelineType=user",
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-sink-connector.adoc b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-sink-connector.adoc
index 83ff9ea..c952438 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-sink-connector.adoc
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-sink-connector.adoc
@@ -24,14 +24,15 @@ connector.class=org.apache.camel.kafkaconnector.twittertimeline.CamelTwittertime
 ----
 
 
-The camel-twitter-timeline sink connector supports 21 options, which are listed below.
+The camel-twitter-timeline sink connector supports 22 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,1,1",options="header"]
 |===
 | Name | Description | Default | Required | Priority
-| *camel.sink.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN] | null | true | HIGH
+| *camel.sink.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN] | null | true | HIGH
+| *camel.sink.endpoint.list* | The list name when using timelineType=list | null | false | MEDIUM
 | *camel.sink.endpoint.user* | The username when using timelineType=user | null | false | MEDIUM
 | *camel.sink.endpoint.lazyStartProducer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then cre [...]
 | *camel.sink.endpoint.httpProxyHost* | The http proxy host which can be used for the camel-twitter. Can also be configured on the TwitterComponent level instead. | null | false | MEDIUM
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-source-connector.adoc b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-source-connector.adoc
index 9ce8e99..9f9806f 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-source-connector.adoc
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/camel-twitter-timeline-kafka-source-connector.adoc
@@ -24,14 +24,15 @@ connector.class=org.apache.camel.kafkaconnector.twittertimeline.CamelTwittertime
 ----
 
 
-The camel-twitter-timeline source connector supports 53 options, which are listed below.
+The camel-twitter-timeline source connector supports 54 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,1,1",options="header"]
 |===
 | Name | Description | Default | Required | Priority
-| *camel.source.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN] | null | true | HIGH
+| *camel.source.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN] | null | true | HIGH
+| *camel.source.endpoint.list* | The list name when using timelineType=list | null | false | MEDIUM
 | *camel.source.endpoint.user* | The username when using timelineType=user | null | false | MEDIUM
 | *camel.source.endpoint.bridgeErrorHandler* | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | false | MEDIUM
 | *camel.source.endpoint.sendEmptyMessageWhenIdle* | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | false | MEDIUM
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSinkConnector.properties b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSinkConnector.properties
index 88c4698..8b726ea 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSinkConnector.properties
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSinkConnector.properties
@@ -28,6 +28,6 @@ topics=
 
 # mandatory properties (for a complete properties list see the connector documentation):
 
-# The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]
+# The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]
 camel.sink.path.timelineType=
 
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSourceConnector.properties b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSourceConnector.properties
index af49fdc..9e5837d 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSourceConnector.properties
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/main/docs/examples/CamelTwittertimelineSourceConnector.properties
@@ -28,6 +28,6 @@ topics=
 
 # mandatory properties (for a complete properties list see the connector documentation):
 
-# The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]
+# The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]
 camel.source.path.timelineType=
 
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSinkConnectorConfig.java b/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSinkConnectorConfig.java
index fa2d5b4..1df250d 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSinkConnectorConfig.java
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSinkConnectorConfig.java
@@ -27,8 +27,11 @@ public class CamelTwittertimelineSinkConnectorConfig
             CamelSinkConnectorConfig {
 
     public static final String CAMEL_SINK_TWITTERTIMELINE_PATH_TIMELINE_TYPE_CONF = "camel.sink.path.timelineType";
-    public static final String CAMEL_SINK_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DOC = "The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]";
+    public static final String CAMEL_SINK_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DOC = "The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]";
     public static final String CAMEL_SINK_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DEFAULT = null;
+    public static final String CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LIST_CONF = "camel.sink.endpoint.list";
+    public static final String CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LIST_DOC = "The list name when using timelineType=list";
+    public static final String CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LIST_DEFAULT = null;
     public static final String CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_USER_CONF = "camel.sink.endpoint.user";
     public static final String CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_USER_DOC = "The username when using timelineType=user";
     public static final String CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_USER_DEFAULT = null;
@@ -104,6 +107,7 @@ public class CamelTwittertimelineSinkConnectorConfig
     public static ConfigDef conf() {
         ConfigDef conf = new ConfigDef(CamelSinkConnectorConfig.conf());
         conf.define(CAMEL_SINK_TWITTERTIMELINE_PATH_TIMELINE_TYPE_CONF, ConfigDef.Type.STRING, CAMEL_SINK_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SINK_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DOC);
+        conf.define(CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LIST_CONF, ConfigDef.Type.STRING, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LIST_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LIST_DOC);
         conf.define(CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_USER_CONF, ConfigDef.Type.STRING, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_USER_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_USER_DOC);
         conf.define(CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LAZY_START_PRODUCER_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LAZY_START_PRODUCER_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_LAZY_START_PRODUCER_DOC);
         conf.define(CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_HTTP_PROXY_HOST_CONF, ConfigDef.Type.STRING, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_HTTP_PROXY_HOST_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_TWITTERTIMELINE_ENDPOINT_HTTP_PROXY_HOST_DOC);
diff --git a/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSourceConnectorConfig.java b/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSourceConnectorConfig.java
index 465fa3e..15df12a 100644
--- a/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSourceConnectorConfig.java
+++ b/connectors/camel-twitter-timeline-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/twittertimeline/CamelTwittertimelineSourceConnectorConfig.java
@@ -27,8 +27,11 @@ public class CamelTwittertimelineSourceConnectorConfig
             CamelSourceConnectorConfig {
 
     public static final String CAMEL_SOURCE_TWITTERTIMELINE_PATH_TIMELINE_TYPE_CONF = "camel.source.path.timelineType";
-    public static final String CAMEL_SOURCE_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DOC = "The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN]";
+    public static final String CAMEL_SOURCE_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DOC = "The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN]";
     public static final String CAMEL_SOURCE_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DEFAULT = null;
+    public static final String CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_LIST_CONF = "camel.source.endpoint.list";
+    public static final String CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_LIST_DOC = "The list name when using timelineType=list";
+    public static final String CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_LIST_DEFAULT = null;
     public static final String CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_USER_CONF = "camel.source.endpoint.user";
     public static final String CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_USER_DOC = "The username when using timelineType=user";
     public static final String CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_USER_DEFAULT = null;
@@ -200,6 +203,7 @@ public class CamelTwittertimelineSourceConnectorConfig
     public static ConfigDef conf() {
         ConfigDef conf = new ConfigDef(CamelSourceConnectorConfig.conf());
         conf.define(CAMEL_SOURCE_TWITTERTIMELINE_PATH_TIMELINE_TYPE_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_SOURCE_TWITTERTIMELINE_PATH_TIMELINE_TYPE_DOC);
+        conf.define(CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_LIST_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_LIST_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_LIST_DOC);
         conf.define(CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_USER_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_USER_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_USER_DOC);
         conf.define(CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_BRIDGE_ERROR_HANDLER_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_BRIDGE_ERROR_HANDLER_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_BRIDGE_ERROR_HANDLER_DOC);
         conf.define(CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_SEND_EMPTY_MESSAGE_WHEN_IDLE_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_SEND_EMPTY_MESSAGE_WHEN_IDLE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_TWITTERTIMELINE_ENDPOINT_SEND_EMPTY_MESSAGE_WHEN_IDLE_DOC);
diff --git a/docs/modules/ROOT/pages/connectors/camel-cql-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-cql-kafka-sink-connector.adoc
index f7ec7a5..016d380 100644
--- a/docs/modules/ROOT/pages/connectors/camel-cql-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-cql-kafka-sink-connector.adoc
@@ -32,8 +32,8 @@ The camel-cql sink connector supports 17 options, which are listed below.
 |===
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.beanRef* | beanRef is defined using bean:id | null | false | MEDIUM
-| *camel.sink.path.hosts* | Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
-| *camel.sink.path.port* | Port number of cassansdra server(s) | null | false | MEDIUM
+| *camel.sink.path.hosts* | Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
+| *camel.sink.path.port* | Port number of Cassandra server(s) | null | false | MEDIUM
 | *camel.sink.path.keyspace* | Keyspace to use | null | false | MEDIUM
 | *camel.sink.endpoint.clusterName* | Cluster name | null | false | MEDIUM
 | *camel.sink.endpoint.consistencyLevel* | Consistency level to use One of: [ANY] [ONE] [TWO] [THREE] [QUORUM] [ALL] [LOCAL_ONE] [LOCAL_QUORUM] [EACH_QUORUM] [SERIAL] [LOCAL_SERIAL] | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-cql-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-cql-kafka-source-connector.adoc
index fbf7210..23ae1a6 100644
--- a/docs/modules/ROOT/pages/connectors/camel-cql-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-cql-kafka-source-connector.adoc
@@ -32,8 +32,8 @@ The camel-cql source connector supports 35 options, which are listed below.
 |===
 | Name | Description | Default | Required | Priority
 | *camel.source.path.beanRef* | beanRef is defined using bean:id | null | false | MEDIUM
-| *camel.source.path.hosts* | Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
-| *camel.source.path.port* | Port number of cassansdra server(s) | null | false | MEDIUM
+| *camel.source.path.hosts* | Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma. | null | false | MEDIUM
+| *camel.source.path.port* | Port number of Cassandra server(s) | null | false | MEDIUM
 | *camel.source.path.keyspace* | Keyspace to use | null | false | MEDIUM
 | *camel.source.endpoint.clusterName* | Cluster name | null | false | MEDIUM
 | *camel.source.endpoint.consistencyLevel* | Consistency level to use One of: [ANY] [ONE] [TWO] [THREE] [QUORUM] [ALL] [LOCAL_ONE] [LOCAL_QUORUM] [EACH_QUORUM] [SERIAL] [LOCAL_SERIAL] | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-solr-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-solr-kafka-sink-connector.adoc
index 7d95c36..3f2aff1 100644
--- a/docs/modules/ROOT/pages/connectors/camel-solr-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-solr-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.solr.CamelSolrSinkConnector
 ----
 
 
-The camel-solr sink connector supports 18 options, which are listed below.
+The camel-solr sink connector supports 19 options, which are listed below.
 
 
 
@@ -33,6 +33,7 @@ The camel-solr sink connector supports 18 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.url* | Hostname and port for the solr server | null | true | HIGH
 | *camel.sink.endpoint.allowCompression* | Server side must support gzip or deflate for this to have any effect | null | false | MEDIUM
+| *camel.sink.endpoint.autoCommit* | If true, each producer operation will be committed automatically | false | false | MEDIUM
 | *camel.sink.endpoint.connectionTimeout* | connectionTimeout on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.defaultMaxConnectionsPerHost* | maxConnectionsPerHost on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.followRedirects* | indicates whether redirects are used to get to the Solr server | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-solrCloud-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-solrCloud-kafka-sink-connector.adoc
index 765b2a7..9a3849b 100644
--- a/docs/modules/ROOT/pages/connectors/camel-solrCloud-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-solrCloud-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.solrcloud.CamelSolrcloudSinkConn
 ----
 
 
-The camel-solrCloud sink connector supports 18 options, which are listed below.
+The camel-solrCloud sink connector supports 19 options, which are listed below.
 
 
 
@@ -33,6 +33,7 @@ The camel-solrCloud sink connector supports 18 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.url* | Hostname and port for the solr server | null | true | HIGH
 | *camel.sink.endpoint.allowCompression* | Server side must support gzip or deflate for this to have any effect | null | false | MEDIUM
+| *camel.sink.endpoint.autoCommit* | If true, each producer operation will be committed automatically | false | false | MEDIUM
 | *camel.sink.endpoint.connectionTimeout* | connectionTimeout on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.defaultMaxConnectionsPerHost* | maxConnectionsPerHost on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.followRedirects* | indicates whether redirects are used to get to the Solr server | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-solrs-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-solrs-kafka-sink-connector.adoc
index 2fdc077..659a831 100644
--- a/docs/modules/ROOT/pages/connectors/camel-solrs-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-solrs-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.solrs.CamelSolrsSinkConnector
 ----
 
 
-The camel-solrs sink connector supports 18 options, which are listed below.
+The camel-solrs sink connector supports 19 options, which are listed below.
 
 
 
@@ -33,6 +33,7 @@ The camel-solrs sink connector supports 18 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.url* | Hostname and port for the solr server | null | true | HIGH
 | *camel.sink.endpoint.allowCompression* | Server side must support gzip or deflate for this to have any effect | null | false | MEDIUM
+| *camel.sink.endpoint.autoCommit* | If true, each producer operation will be committed automatically | false | false | MEDIUM
 | *camel.sink.endpoint.connectionTimeout* | connectionTimeout on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.defaultMaxConnectionsPerHost* | maxConnectionsPerHost on the underlying HttpConnectionManager | null | false | MEDIUM
 | *camel.sink.endpoint.followRedirects* | indicates whether redirects are used to get to the Solr server | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-sink-connector.adoc
index 83ff9ea..c952438 100644
--- a/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-sink-connector.adoc
@@ -24,14 +24,15 @@ connector.class=org.apache.camel.kafkaconnector.twittertimeline.CamelTwittertime
 ----
 
 
-The camel-twitter-timeline sink connector supports 21 options, which are listed below.
+The camel-twitter-timeline sink connector supports 22 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,1,1",options="header"]
 |===
 | Name | Description | Default | Required | Priority
-| *camel.sink.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN] | null | true | HIGH
+| *camel.sink.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN] | null | true | HIGH
+| *camel.sink.endpoint.list* | The list name when using timelineType=list | null | false | MEDIUM
 | *camel.sink.endpoint.user* | The username when using timelineType=user | null | false | MEDIUM
 | *camel.sink.endpoint.lazyStartProducer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then cre [...]
 | *camel.sink.endpoint.httpProxyHost* | The http proxy host which can be used for the camel-twitter. Can also be configured on the TwitterComponent level instead. | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-source-connector.adoc
index 9ce8e99..9f9806f 100644
--- a/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-twitter-timeline-kafka-source-connector.adoc
@@ -24,14 +24,15 @@ connector.class=org.apache.camel.kafkaconnector.twittertimeline.CamelTwittertime
 ----
 
 
-The camel-twitter-timeline source connector supports 53 options, which are listed below.
+The camel-twitter-timeline source connector supports 54 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,1,1",options="header"]
 |===
 | Name | Description | Default | Required | Priority
-| *camel.source.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [UNKNOWN] | null | true | HIGH
+| *camel.source.path.timelineType* | The timeline type to produce/consume. One of: [PUBLIC] [HOME] [USER] [MENTIONS] [RETWEETSOFME] [LIST] [UNKNOWN] | null | true | HIGH
+| *camel.source.endpoint.list* | The list name when using timelineType=list | null | false | MEDIUM
 | *camel.source.endpoint.user* | The username when using timelineType=user | null | false | MEDIUM
 | *camel.source.endpoint.bridgeErrorHandler* | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | false | MEDIUM
 | *camel.source.endpoint.sendEmptyMessageWhenIdle* | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | false | MEDIUM

[camel-kafka-connector] 01/02: Related to #423 : caonverted source and sink to use camel-kamelets.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ae61051d5120acc595afe7f489faf52cef4dfd4f
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Apr 26 01:38:42 2021 +0200

    Related to #423 : caonverted source and sink to use camel-kamelets.
---
 core/pom.xml                                       |  10 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   5 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   5 +-
 .../utils/CamelKafkaConnectMain.java               | 272 ++++++++++++---------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  10 +-
 .../camel/kafkaconnector/DataFormatTest.java       |  10 +-
 6 files changed, 191 insertions(+), 121 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 4100f6a..6896ab1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -53,12 +53,20 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-kafka</artifactId>
+            <artifactId>camel-kamelet</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-languages</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-xml-jaxb</artifactId>
+        </dependency>
 
         <!-- Tools -->
         <dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82c16d2..4e5a201 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -42,6 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelSinkTask extends SinkTask {
+    public static final String KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSink.";
+
     public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key";
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -119,8 +121,9 @@ public class CamelSinkTask extends SinkTask {
                                                 CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
+            actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl)
+            cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink")
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 00ce145..77ce636 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 
 public class CamelSourceTask extends SourceTask {
+    public static final String KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSource.";
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
 
@@ -145,8 +146,9 @@ public class CamelSourceTask extends SourceTask {
                                                 config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
+            actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl)
+            cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl)
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
@@ -171,6 +173,7 @@ public class CamelSourceTask extends SourceTask {
             consumer.start();
 
             cms.start();
+
             LOG.info("CamelSourceTask connector task started");
         } catch (Exception e) {
             throw new ConnectException("Failed to create and start Camel context", e);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index d031b20..6e7dbdf 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -26,10 +26,16 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.DefaultErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.builder.NoErrorHandlerBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.CamelSourceTask;
 import org.apache.camel.main.SimpleMain;
-import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -40,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelKafkaConnectMain extends SimpleMain {
-    public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
     private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
 
     protected volatile ConsumerTemplate consumerTemplate;
@@ -140,67 +145,67 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.aggregationTimeout = aggregationTimeout;
             return this;
         }
-        
+
         public Builder withErrorHandler(String errorHandler) {
             this.errorHandler = errorHandler;
             return this;
         }
-        
+
         public Builder withMaxRedeliveries(int maxRedeliveries) {
             this.maxRedeliveries = maxRedeliveries;
             return this;
         }
-        
+
         public Builder withRedeliveryDelay(long redeliveryDelay) {
             this.redeliveryDelay = redeliveryDelay;
             return this;
         }
-        
+
         public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
             this.idempotencyEnabled = idempotencyEnabled;
             return this;
         }
-        
+
         public Builder withExpressionType(String expressionType) {
             this.expressionType = expressionType;
             return this;
         }
-        
+
         public Builder withExpressionHeader(String expressionHeader) {
             this.expressionHeader = expressionHeader;
             return this;
         }
-        
+
         public Builder withMemoryDimension(int memoryDimension) {
             this.memoryDimension = memoryDimension;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryType(String idempotentRepositoryType) {
             this.idempotentRepositoryType = idempotentRepositoryType;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) {
             this.idempotentRepositoryTopicName = idempotentRepositoryTopicName;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) {
             this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaMaxCacheSize(int idempotentRepositoryKafkaMaxCacheSize) {
             this.idempotentRepositoryKafkaMaxCacheSize = idempotentRepositoryKafkaMaxCacheSize;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaPollDuration(int idempotentRepositoryKafkaPollDuration) {
             this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration;
             return this;
         }
-        
+
         public Builder withHeadersExcludePattern(String headersExcludePattern) {
             this.headersExcludePattern = headersExcludePattern;
             return this;
@@ -214,21 +219,51 @@ public class CamelKafkaConnectMain extends SimpleMain {
             return entry.getKey() + "=" + entry.getValue();
         }
 
-
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
             camelMain.configure().setAutoConfigurationLogSummary(false);
+            //TODO: make it configurable
+            camelMain.configure().setDumpRoutes(true);
 
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
-            List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//            //dataformats
+//            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+//            }
+//            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+//            }
 
-            LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
-            camelMain.setInitialProperties(camelProperties);
-            
-            // Instantianting the idempotent Repository here and inject it in registry to be referenced
+            //aggregator
+            if (!ObjectHelper.isEmpty(aggregationSize)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+            }
+            if (!ObjectHelper.isEmpty(aggregationTimeout)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+            }
+
+            //idempotency
             if (idempotencyEnabled) {
+                switch (expressionType) {
+                    case "body":
+                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        break;
+                    case "header":
+                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        break;
+                    default:
+                        break;
+                }
+                // Instantiating the idempotent Repository here and inject it in registry to be referenced
                 IdempotentRepository idempotentRepo = null;
                 switch (idempotentRepositoryType) {
                     case "memory":
@@ -240,110 +275,123 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     default:
                         break;
                 }
-                camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
+                camelMain.getCamelContext().getRegistry().bind("ckcIdempotentRepository", idempotentRepo);
+            }
+
+            //remove headers
+            if (!ObjectHelper.isEmpty(headersExcludePattern)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+            }
+
+            List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+            LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
+            camelMain.setInitialProperties(camelProperties);
+
+            //error handler
+            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
+            if (errorHandler != null) {
+                switch (errorHandler) {
+                    case "no":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
+                        break;
+                    case "default":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+                        break;
+                    default:
+                        break;
+                }
             }
 
-            //creating the actual route
             camelMain.configure().addRoutesBuilder(new RouteBuilder() {
                 public void configure() {
-                    //from
-                    RouteDefinition rd = from(from);
-                    LOG.info("Creating Camel route from({})", from);
-                    
-                    if (!ObjectHelper.isEmpty(errorHandler)) {
-                        switch (errorHandler) {
-                            case "no":
-                                rd.errorHandler(noErrorHandler());
-                                break;
-                            case "default":
-                                rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
-                                break;
-                            default:
-                                break;
-                        }
+
+                    //creating source template
+                    RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
+                            .templateParameter("fromUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//                            .templateParameter("marshall", "dummyDataformat")
+//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+                            .templateParameter("aggregationSize", "1")
+                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+                            .templateParameter("idempotentExpression", "dummyExpression")
+                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+                            .templateParameter("headersExcludePattern", "(?!)");
+
+
+                    ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
+                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                        rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat);
+                    }
+                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                        rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat);
                     }
 
-                    //dataformats
+                    if (getContext().getRegistry().lookupByName("aggregate") != null) {
+                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
+                        rdInTemplateSource = rdInTemplateSource.aggregate(s)
+                                .constant(true)
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}");
+                    }
+
+                    if (idempotencyEnabled) {
+                        rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                    }
+
+                    rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
+                            .to("kamelet:sink");
+
+                    //creating sink template
+                    RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
+                            .templateParameter("toUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//                            .templateParameter("marshall", "dummyDataformat")
+//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+                            .templateParameter("aggregationSize", "1")
+                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+                            .templateParameter("idempotentExpression", "dummyExpression")
+                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+                            .templateParameter("headersExcludePattern", "(?!)");
+
+
+                    ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        LOG.info(".marshal({})", marshallDataFormat);
-                        rd.marshal(marshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat);
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        LOG.info(".unmarshal({})", unmarshallDataFormat);
-                        rd.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat);
                     }
+
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                        //aggregation
                         AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        if (idempotencyEnabled) {
-                            switch (expressionType) {
-                                case "body":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
-                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
-                                    LOG.info(".to({})", to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                case "header":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
-                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
-                                    LOG.info(".to({})", to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                default:
-                                    break;
-                            }
-                        } else {
-                            LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
-                            LOG.info(".to({})", to);
-                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
-                            } else {
-                                rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to);
-                            }
-                        }
-                    } else {
-                        if (idempotencyEnabled) {
-                            switch (expressionType) {
-                                case "body":
-                                    LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                case "header":
-                                    LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                default:
-                                    break;
-                            }
-                        } else {
-                            //to
-                            LOG.info(".to({})", to);
-                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                rd.toD(to);
-                            } else {
-                                rd.removeHeaders(headersExcludePattern).toD(to);
-                            }
-                        }
+                        rdInTemplateSink = rdInTemplateSink.aggregate(s)
+                                .constant(true)
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}");
                     }
+
+                    if (idempotencyEnabled) {
+                        rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                    }
+
+                    rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
+                            .to("{{toUrl}}");
+
+                    //creating the actual route
+                    from(from).toD(to);
                 }
             });
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 5c99ad0..b1271ac 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CamelSourceTaskTest {
-
     private static final String DIRECT_URI = "direct:start";
     private static final String TOPIC_NAME = "my-topic";
 
@@ -225,7 +224,7 @@ public class CamelSourceTaskTest {
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() {
+    public void testUrlPrecedenceOnComponentProperty() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, "timer:foo?period=10&repeatCount=2");
@@ -236,7 +235,8 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
+
 
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
@@ -261,10 +261,10 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
 
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
-                .filter(e -> e.getEndpointUri().startsWith("direct"))
+                .filter(e -> e.getEndpointUri().startsWith("seda"))
                 .forEach(e -> {
                     assertTrue(e.getEndpointUri().contains("end"));
                     assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 6715843..c3d26a4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
@@ -24,6 +26,7 @@ import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -66,8 +69,13 @@ public class DataFormatTest {
         props.put("camel.sink.marshal", "missingDataformat");
 
         CamelSinkTask camelsinkTask = new CamelSinkTask();
-        assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
+        camelsinkTask.start(props);
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
+        records.add(record);
+        assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
         // No need to check the stop method. The error is already thrown/caught during startup.
+        camelsinkTask.stop();
     }
 
     @Test