You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/11 09:05:46 UTC

[camel-kafka-connector] branch camel-master updated (cc49c17 -> a85a209)

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a change to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from cc49c17  [create-pull-request] automated change
     new f315719  fixed #980 : camel.source.contentLogLevel config not honored in source connectors
     new dff4d98  Fixed flaky hdfs itest.
     new ef0eb99  Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202
     new 290f065  fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.
     new 99f11df  Fixed itest for netty-http.
     new c962725  chore: fix checkstyle.
     new 3861245  Some folowup improvements fro #969 and #202.
     new e806401  Adjusted catalog tests for camel 3.9.0
     new a85a209  chore: regen

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resources/connectors/camel-aws2-s3-sink.json   |   4 +-
 .../resources/connectors/camel-aws2-s3-source.json |   4 +-
 .../resources/connectors/camel-aws2-sns-sink.json  |   4 +-
 .../resources/connectors/camel-aws2-sqs-sink.json  |   4 +-
 .../connectors/camel-aws2-sqs-source.json          |   4 +-
 .../connectors/camel-netty-http-sink.json          |  14 +++
 .../connectors/camel-netty-http-source.json        |  14 +++
 .../resources/connectors/camel-netty-sink.json     |  14 +++
 .../resources/connectors/camel-netty-source.json   |  14 +++
 .../connectors/camel-spring-rabbitmq-source.json   |  47 ++++++++
 .../catalog/CamelKafkaConnectorCatalogTest.java    |   2 +-
 .../generated/resources/camel-aws2-s3-sink.json    |   4 +-
 .../generated/resources/camel-aws2-s3-source.json  |   4 +-
 .../docs/camel-aws2-s3-kafka-sink-connector.adoc   |   4 +-
 .../docs/camel-aws2-s3-kafka-source-connector.adoc |   4 +-
 .../aws2s3/CamelAws2s3SinkConnectorConfig.java     |   4 +-
 .../aws2s3/CamelAws2s3SourceConnectorConfig.java   |   4 +-
 .../generated/resources/camel-aws2-sns-sink.json   |   4 +-
 .../docs/camel-aws2-sns-kafka-sink-connector.adoc  |   4 +-
 .../aws2sns/CamelAws2snsSinkConnectorConfig.java   |   4 +-
 .../generated/resources/camel-aws2-sqs-sink.json   |   4 +-
 .../generated/resources/camel-aws2-sqs-source.json |   4 +-
 .../docs/camel-aws2-sqs-kafka-sink-connector.adoc  |   4 +-
 .../camel-aws2-sqs-kafka-source-connector.adoc     |   4 +-
 .../aws2sqs/CamelAws2sqsSinkConnectorConfig.java   |   4 +-
 .../aws2sqs/CamelAws2sqsSourceConnectorConfig.java |   4 +-
 .../generated/resources/camel-netty-http-sink.json |  14 +++
 .../resources/camel-netty-http-source.json         |  14 +++
 .../camel-netty-http-kafka-sink-connector.adoc     |   4 +-
 .../camel-netty-http-kafka-source-connector.adoc   |   4 +-
 .../CamelNettyhttpSinkConnectorConfig.java         |   8 ++
 .../CamelNettyhttpSourceConnectorConfig.java       |   8 ++
 .../src/generated/resources/camel-netty-sink.json  |  14 +++
 .../generated/resources/camel-netty-source.json    |  14 +++
 .../docs/camel-netty-kafka-sink-connector.adoc     |   4 +-
 .../docs/camel-netty-kafka-source-connector.adoc   |   4 +-
 .../netty/CamelNettySinkConnectorConfig.java       |   8 ++
 .../netty/CamelNettySourceConnectorConfig.java     |   8 ++
 .../resources/camel-spring-rabbitmq-source.json    |  47 ++++++++
 ...mel-spring-rabbitmq-kafka-source-connector.adoc |   8 +-
 .../CamelSpringrabbitmqSourceConnectorConfig.java  |  24 +++++
 core/pom.xml                                       |  19 +++-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  14 ++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |   5 +
 .../camel/kafkaconnector/CamelSourceRecord.java    |  59 ++++++++++
 .../camel/kafkaconnector/CamelSourceTask.java      |  99 +++++++++++++----
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  13 +++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  62 +++++++++++
 .../CamelTypeConverterTransformTest.java           |  24 +++++
 docs/modules/ROOT/nav.adoc                         |  50 +++++++++
 docs/modules/ROOT/pages/connectors.adoc            |  24 ++++-
 .../camel-aws2-s3-kafka-sink-connector.adoc        |   4 +-
 .../camel-aws2-s3-kafka-source-connector.adoc      |   4 +-
 .../camel-aws2-sns-kafka-sink-connector.adoc       |   4 +-
 .../camel-aws2-sqs-kafka-sink-connector.adoc       |   4 +-
 .../camel-aws2-sqs-kafka-source-connector.adoc     |   4 +-
 .../camel-netty-http-kafka-sink-connector.adoc     |   4 +-
 .../camel-netty-http-kafka-source-connector.adoc   |   4 +-
 .../camel-netty-kafka-sink-connector.adoc          |   4 +-
 .../camel-netty-kafka-source-connector.adoc        |   4 +-
 ...mel-spring-rabbitmq-kafka-source-connector.adoc |   8 +-
 parent/pom.xml                                     |   9 ++
 .../hdfs/sink/CamelSinkHDFSITCase.java             |  19 ++--
 .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java  |  26 ++++-
 tests/{itests-sql => itests-netty-http}/pom.xml    |  27 ++---
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java}  |  71 ++++++------
 .../source/CamelNettyHTTPPropertyFactory.java}     |  41 ++++---
 .../source/CamelSourceNettyHTTPITCase.java         | 119 +++++++++++++++++++++
 68 files changed, 927 insertions(+), 166 deletions(-)
 create mode 100644 core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
 copy tests/{itests-sql => itests-netty-http}/pom.xml (81%)
 copy tests/{itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java => itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java} (60%)
 copy tests/{itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSyslogPropertyFactory.java => itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java} (51%)
 create mode 100644 tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java


[camel-kafka-connector] 07/09: Some folowup improvements fro #969 and #202.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3861245678539663d5ee16149f955810ed3a45ac
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Mar 8 15:57:09 2021 +0100

    Some folowup improvements fro #969 and #202.
---
 .../camel/kafkaconnector/CamelSourceTask.java      | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 51b055d..00ce145 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -219,11 +218,6 @@ public class CamelSourceTask extends SourceTask {
                 StreamCache sc = (StreamCache) messageBodyValue;
                 // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
                 sc.reset();
-                try {
-                    messageBodyValue = sc.copy(exchange);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
             }
             for (String singleTopic : topics) {
                 CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
@@ -256,15 +250,23 @@ public class CamelSourceTask extends SourceTask {
     }
 
     @Override
-    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
+        LOG.debug("Committing record: {} with metadata: {}", record, metadata);
         ///XXX: this should be a safe cast please see: https://issues.apache.org/jira/browse/KAFKA-12391
         Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck();
         LOG.debug("Committing record with claim check number: {}", claimCheck);
         Exchange correlatedExchange = exchangesWaitingForAck[claimCheck];
-        exchangesWaitingForAck[claimCheck] = null;
-        freeSlots.add(claimCheck);
-        UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
-        LOG.debug("Record with claim check number: {} committed.", claimCheck);
+        try {
+            UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
+            LOG.debug("Record with claim check number: {} committed.", claimCheck);
+        } catch (Throwable t) {
+            LOG.error("Exception during Unit Of Work completion: {} caused by: {}", t.getMessage(), t.getCause());
+            throw new RuntimeException(t);
+        } finally {
+            exchangesWaitingForAck[claimCheck] = null;
+            freeSlots.add(claimCheck);
+            LOG.debug("Claim check number: {} freed.", claimCheck);
+        }
     }
 
     @Override


[camel-kafka-connector] 03/09: Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ef0eb99867e9d5954ab541677be3d19cdf8fa2a1
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:15:53 2021 +0100

    Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202
---
 core/pom.xml                                       | 19 ++++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 ++
 .../camel/kafkaconnector/CamelSourceRecord.java    | 43 ++++++++++
 .../camel/kafkaconnector/CamelSourceTask.java      | 96 +++++++++++++++++-----
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 50 +++++++++++
 .../CamelTypeConverterTransformTest.java           | 25 ++++++
 parent/pom.xml                                     |  9 ++
 7 files changed, 221 insertions(+), 26 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 2e32d16..f59f70d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-seda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-kafka</artifactId>
         </dependency>
         <dependency>
@@ -56,6 +60,13 @@
             <artifactId>camel-core-languages</artifactId>
         </dependency>
 
+        <!-- Tools -->
+        <dependency>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
+            <version>${version.jctools}</version>
+        </dependency>
+
         <!-- Kafka -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
@@ -108,22 +119,22 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-seda</artifactId>
+            <artifactId>camel-timer</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-timer</artifactId>
+            <artifactId>camel-log</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-log</artifactId>
+            <artifactId>camel-slack</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-slack</artifactId>
+            <artifactId>camel-netty-http</artifactId>
             <scope>test</scope>
         </dependency>
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index bb4f8f8..4acfa62 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -54,6 +54,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
     public static final String CAMEL_SOURCE_MAX_POLL_DURATION_CONF = "camel.source.maxPollDuration";
     public static final String CAMEL_SOURCE_MAX_POLL_DURATION_DOC = "The maximum time in milliseconds spent in a single call to poll()";
 
+    public static final Integer CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT = 1024;
+    public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF = "camel.source.maxNotCommittedRecords";
+    public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC = "The maximum number of non committed kafka connect records that can be tolerated before stop polling new records (rounded to the next power of 2) with a minimum of 4.";
+
     public static final Long CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT = 1000L;
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF = "camel.source.pollingConsumerQueueSize";
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC = "The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue.";
@@ -82,6 +86,7 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
         .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+        .define(CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, Type.INT, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
new file mode 100644
index 0000000..87934ef
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
@@ -0,0 +1,43 @@
+package org.apache.camel.kafkaconnector;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Map;
+
+public class CamelSourceRecord extends SourceRecord {
+    private Integer claimCheck = null;
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, keySchema, key, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
+    }
+
+    public Integer getClaimCheck() {
+        return claimCheck;
+    }
+
+    public void setClaimCheck(Integer claimCheck) {
+        this.claimCheck = claimCheck;
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 16e6bfc..03d0c1a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,30 +16,37 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.StreamCache;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
 public class CamelSourceTask extends SourceTask {
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -49,18 +56,23 @@ public class CamelSourceTask extends SourceTask {
     private static final String CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX = "camel.source.endpoint.";
     private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path.";
 
-    private static final String LOCAL_URL = "direct:end";
+    private static final String LOCAL_URL = "seda:end";
 
     private CamelKafkaConnectMain cms;
     private PollingConsumer consumer;
     private String[] topics;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
+    private Integer maxNotCommittedRecords;
     private String camelMessageHeaderKey;
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
+    private Exchange[] exchangesWaitingForAck;
+    //the assumption is that at most 1 thread is running poll() method and at most 1 thread is running commitRecord()
+    private SpscArrayQueue<Integer> freeSlots;
     private boolean mapProperties;
     private boolean mapHeaders;
 
+
     @Override
     public String version() {
         return VersionUtil.getVersion();
@@ -82,6 +94,7 @@ public class CamelSourceTask extends SourceTask {
 
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
             maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
+            maxNotCommittedRecords = config.getInt(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF);
 
             camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
 
@@ -105,10 +118,24 @@ public class CamelSourceTask extends SourceTask {
             final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
             mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
-            
+
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
-            String localUrl = getLocalUrlWithPollingOptions(config);
+            long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
+            long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
+            boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+            String localUrl = getLocalUrlWithPollingOptions(pollingConsumerQueueSize, pollingConsumerBlockTimeout, pollingConsumerBlockWhenFull);
+
+            freeSlots = new SpscArrayQueue<>(maxNotCommittedRecords);
+            freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() {
+                int i = 0;
+                @Override
+                public Integer get() {
+                    return i++;
+                }
+            });
+            //needs to be done like this because freeSlots capacity is rounded to the next power of 2 of maxNotCommittedRecords
+            exchangesWaitingForAck = new Exchange[freeSlots.capacity()];
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -155,13 +182,14 @@ public class CamelSourceTask extends SourceTask {
 
     @Override
     public synchronized List<SourceRecord> poll() {
+        LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size());
         final long startPollEpochMilli = Instant.now().toEpochMilli();
 
         long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
         List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+        while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) {
             Exchange exchange = consumer.receive(remaining);
             if (exchange == null) {
                 // Nothing received, abort and return what we received so far
@@ -177,31 +205,46 @@ public class CamelSourceTask extends SourceTask {
             Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
             final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-            final Object messageBodyValue = exchange.getMessage().getBody();
+            Object messageBodyValue = exchange.getMessage().getBody();
 
             final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
             final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
             final long timestamp = calculateTimestamp(exchange);
 
+            // take in account Cached camel streams
+            if (messageBodyValue instanceof StreamCache) {
+                StreamCache sc = (StreamCache) messageBodyValue;
+                // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
+                sc.reset();
+                try {
+                    messageBodyValue = sc.copy(exchange);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
             for (String singleTopic : topics) {
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
+                CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
                         messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
 
                 if (mapHeaders) {
                     if (exchange.getMessage().hasHeaders()) {
-                        setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                        setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                     }
                 }
                 
                 if (mapProperties) {
                     if (exchange.hasProperties()) {
-                        setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                        setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
                     }
                 }
 
-                TaskHelper.logRecordContent(LOG, loggingLevel, record);
-                records.add(record);
+                TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
+                Integer claimCheck = freeSlots.remove();
+                camelRecord.setClaimCheck(claimCheck);
+                exchangesWaitingForAck[claimCheck] = exchange;
+                LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck);
+                records.add(camelRecord);
             }
             collectedRecords++;
             remaining = remaining(startPollEpochMilli, maxPollDuration);
@@ -211,6 +254,18 @@ public class CamelSourceTask extends SourceTask {
     }
 
     @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
+        ///XXX: this should be a safe cast please see: https://issues.apache.org/jira/browse/KAFKA-12391
+        Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck();
+        LOG.debug("Committing record with claim check number: {}", claimCheck);
+        Exchange correlatedExchange = exchangesWaitingForAck[claimCheck];
+        exchangesWaitingForAck[claimCheck] = null;
+        freeSlots.add(claimCheck);
+        UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
+        LOG.debug("Record with claim check number: {} committed.", claimCheck);
+    }
+
+    @Override
     public void stop() {
         LOG.info("Stopping CamelSourceTask connector task");
         try {
@@ -301,10 +356,7 @@ public class CamelSourceTask extends SourceTask {
         }
     }
 
-    private String getLocalUrlWithPollingOptions(CamelSourceConnectorConfig config) {
-        long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
-        long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
-        boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+    private String getLocalUrlWithPollingOptions(long pollingConsumerQueueSize, long pollingConsumerBlockTimeout, boolean pollingConsumerBlockWhenFull) {
         return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout
                + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
     }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 21d56fc..51b4db3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,15 +16,19 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.awt.print.PrinterJob;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
@@ -77,6 +81,24 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourcePollingMaxNotCommittedRecords() {
+        final long size = 4;
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, String.valueOf(size));
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+
+        sendBatchOfRecords(sourceTask, size + 1);
+        List<SourceRecord> poll = sourceTask.poll();
+
+        assertEquals(4, poll.size());
+        sourceTask.stop();
+    }
+
+    @Test
     public void testSourcePollingMaxBatchPollSize() {
         final long size = 2;
         Map<String, String> props = new HashMap<>();
@@ -621,4 +643,32 @@ public class CamelSourceTaskTest {
 
         sourceTask.stop();
     }
+
+    @Test
+    public void testRequestReply() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
+                String result = template.requestBody(DIRECT_URI, "test", String.class);
+                assertEquals("test", result);
+            }
+        });
+
+        List<SourceRecord> poll = sourceTask.poll();
+        assertEquals(1, poll.size());
+
+        sourceTask.commitRecord(poll.get(0), null);
+
+        sourceTask.stop();
+        executor.shutdown();
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index 92c668b..c6cecbf 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.kafkaconnector.transforms;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import io.netty.buffer.Unpooled;
+import org.apache.camel.component.netty.http.NettyChannelBufferStreamCache;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
@@ -63,6 +67,27 @@ public class CamelTypeConverterTransformTest {
     }
 
     @Test
+    public void testIfItConvertsNettyCorrectly() {
+        final String testMessage = "testMessage";
+        NettyChannelBufferStreamCache nettyTestValue = new NettyChannelBufferStreamCache(Unpooled.wrappedBuffer(testMessage.getBytes(Charset.defaultCharset())));
+
+        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", Schema.BYTES_SCHEMA, nettyTestValue);
+
+        final Map<String, Object> propsForValueSmt = new HashMap<>();
+        propsForValueSmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, "java.lang.String");
+
+        final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+        transformationValue.configure(propsForValueSmt);
+
+        final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
+
+        assertEquals(java.lang.String.class, transformedValueSourceRecord.value().getClass());
+        assertEquals(Schema.STRING_SCHEMA, transformedValueSourceRecord.valueSchema());
+        assertEquals(testMessage, transformedValueSourceRecord.value());
+    }
+
+    @Test
     public void testIfHandlesTypeConvertersFromCamelComponents() {
         // we know we have a type converter from struct to map in dbz component, so we use this for testing
         final Schema schema = SchemaBuilder.struct()
diff --git a/parent/pom.xml b/parent/pom.xml
index 2249d43..4318c70 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -35,6 +35,7 @@
         <version.guava>20.0</version.guava>
         <version.javax.annotation-api>1.3.2</version.javax.annotation-api>
         <version.postgres>42.2.14</version.postgres>
+        <version.jctools>3.3.0</version.jctools>
 
         <version.maven.compiler>3.8.1</version.maven.compiler>
         <version.maven.javadoc>3.1.1</version.maven.javadoc>
@@ -54,6 +55,8 @@
         <version.maven.checkstyle.plugin>3.1.0</version.maven.checkstyle.plugin>
         <version.maven.surefire.plugin>3.0.0-M4</version.maven.surefire.plugin>
 
+        <!-- Note: we are deliberately overriding this one due to GH issue #990 -->
+        <testcontainers-version>1.15.2</testcontainers-version>
 
         <mycila-license-version>3.0</mycila-license-version>
         <gmavenplus-plugin-version>1.9.0</gmavenplus-plugin-version>
@@ -113,6 +116,12 @@
                 <version>${version.guava}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.jctools</groupId>
+                <artifactId>jctools-core</artifactId>
+                <version>${version.jctools}</version>
+            </dependency>
+
             <!--  Kafka dependencies -->
             <dependency>
                 <groupId>org.apache.kafka</groupId>


[camel-kafka-connector] 06/09: chore: fix checkstyle.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c96272548b81612944725f553e73a868a8712cb2
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat Mar 6 15:46:21 2021 +0100

    chore: fix checkstyle.
---
 .../camel/kafkaconnector/CamelSourceRecord.java    | 22 +++++++++++++++++++---
 .../camel/kafkaconnector/CamelSourceTask.java      | 20 +++++++++++---------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  2 --
 .../CamelTypeConverterTransformTest.java           |  1 -
 .../hdfs/sink/CamelSinkHDFSITCase.java             | 13 ++++++-------
 .../source/CamelSourceNettyHTTPITCase.java         | 14 +++++---------
 6 files changed, 41 insertions(+), 31 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
index 87934ef..5d03b89 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
@@ -1,13 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.kafkaconnector;
 
+import java.util.Map;
+
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Map;
-
 public class CamelSourceRecord extends SourceRecord {
-    private Integer claimCheck = null;
+    private Integer claimCheck;
 
     public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) {
         super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 03d0c1a..51b055d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,6 +16,15 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
@@ -38,14 +47,7 @@ import org.jctools.queues.SpscArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+
 
 public class CamelSourceTask extends SourceTask {
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
@@ -128,7 +130,7 @@ public class CamelSourceTask extends SourceTask {
 
             freeSlots = new SpscArrayQueue<>(maxNotCommittedRecords);
             freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() {
-                int i = 0;
+                int i;
                 @Override
                 public Integer get() {
                     return i++;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 51b4db3..5c99ad0 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.awt.print.PrinterJob;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
@@ -28,7 +27,6 @@ import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index c6cecbf..6da72c2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.kafkaconnector.transforms;
 
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index 55cf21f..a111fdc 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -14,9 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.kafkaconnector.hdfs.sink;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -36,11 +40,6 @@ import org.junit.runners.model.InitializationError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -85,7 +84,7 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
         boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test")
                                                                         &&  hdfsEasy.delete(new Path(currentBasePath, "initTest")));
 
-        if(hdfsServiceCorrectlyStarted) {
+        if (hdfsServiceCorrectlyStarted) {
             if (!hdfsEasy.delete(currentBasePath)) {
                 // This is OK: directory may not exist on the path
                 LOG.debug("The directory at {} was not removed", currentBasePath.getName());
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 48bcb59..0174eb1 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.kafkaconnector.nettyhttp.source;
 
+import java.io.IOException;
+import java.net.InetAddress;
+
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
@@ -25,7 +28,6 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -33,9 +35,6 @@ import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.InetAddress;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -59,9 +58,6 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
         topicName = getTopicForTest(this);
     }
 
-    @AfterEach
-    public void tearDown() {}
-
     @Test
     @Timeout(90)
     public void testBasicSendReceive() throws Exception {
@@ -81,7 +77,7 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
     protected void produceTestData() {
         int retriesLeft = 10;
         boolean success = false;
-        while(retriesLeft > 0 && !success) {
+        while (retriesLeft > 0 && !success) {
             try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
 
                 byte[] ipAddr = new byte[]{127, 0, 0, 1};
@@ -99,7 +95,7 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
                 success = true;
                 LOG.info("Request success at {} attempt.", retriesLeft);
             } catch (IOException e) {
-                if(retriesLeft == 1) {
+                if (retriesLeft == 1) {
                     e.printStackTrace();
                     fail("There should be no exceptions in sending the http test message.");
                 } else {


[camel-kafka-connector] 05/09: Fixed itest for netty-http.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 99f11dfe4005544b12d541ba553dee9f134e9ad6
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat Mar 6 02:44:30 2021 +0100

    Fixed itest for netty-http.
---
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java   | 112 +++++++++++++++++++++
 .../source/CamelSourceNettyHTTPITCase.java         |   2 +-
 tests/pom.xml                                      |   1 -
 3 files changed, 113 insertions(+), 2 deletions(-)

diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
new file mode 100644
index 0000000..96bd27a
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class);
+
+    private MockWebServer mockServer;
+
+    private String topicName;
+
+    private final int expect = 1;
+    private volatile RecordedRequest received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-http-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+        mockServer = new MockWebServer();
+        received = null;
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (mockServer != null) {
+            mockServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            received = mockServer.takeRequest();
+        } catch (InterruptedException e) {
+            LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        String expected = "Sink test message 0";
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals("/test", received.getPath(), "Received path differed");
+            assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+                .withTopics(topicName)
+                .withProtocol("http")
+                .withHost(mockServer.getHostName())
+                .withPort(mockServer.getPort())
+                .withPath("test");
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
+                .buildUrl();
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 41cb6e1..48bcb59 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
-    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000);
     private static final String TEST_MESSAGE = "testMessage";
 
     private String topicName;
diff --git a/tests/pom.xml b/tests/pom.xml
index abae69b..4009424 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -53,7 +53,6 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
-        <module>itests-netty-http</module>
         <module>itests-jdbc</module>
         <module>itests-azure-storage-blob</module>
         <module>itests-azure-storage-queue</module>


[camel-kafka-connector] 09/09: chore: regen

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit a85a209e9b80e8646f702cd08d400a4b6a633f25
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Wed Mar 10 17:30:52 2021 +0100

    chore: regen
---
 .../resources/connectors/camel-aws2-s3-sink.json   |  4 +-
 .../resources/connectors/camel-aws2-s3-source.json |  4 +-
 .../resources/connectors/camel-aws2-sns-sink.json  |  4 +-
 .../resources/connectors/camel-aws2-sqs-sink.json  |  4 +-
 .../connectors/camel-aws2-sqs-source.json          |  4 +-
 .../connectors/camel-netty-http-sink.json          | 14 ++++++
 .../connectors/camel-netty-http-source.json        | 14 ++++++
 .../resources/connectors/camel-netty-sink.json     | 14 ++++++
 .../resources/connectors/camel-netty-source.json   | 14 ++++++
 .../connectors/camel-spring-rabbitmq-source.json   | 47 ++++++++++++++++++++
 .../generated/resources/camel-aws2-s3-sink.json    |  4 +-
 .../generated/resources/camel-aws2-s3-source.json  |  4 +-
 .../docs/camel-aws2-s3-kafka-sink-connector.adoc   |  4 +-
 .../docs/camel-aws2-s3-kafka-source-connector.adoc |  4 +-
 .../aws2s3/CamelAws2s3SinkConnectorConfig.java     |  4 +-
 .../aws2s3/CamelAws2s3SourceConnectorConfig.java   |  4 +-
 .../generated/resources/camel-aws2-sns-sink.json   |  4 +-
 .../docs/camel-aws2-sns-kafka-sink-connector.adoc  |  4 +-
 .../aws2sns/CamelAws2snsSinkConnectorConfig.java   |  4 +-
 .../generated/resources/camel-aws2-sqs-sink.json   |  4 +-
 .../generated/resources/camel-aws2-sqs-source.json |  4 +-
 .../docs/camel-aws2-sqs-kafka-sink-connector.adoc  |  4 +-
 .../camel-aws2-sqs-kafka-source-connector.adoc     |  4 +-
 .../aws2sqs/CamelAws2sqsSinkConnectorConfig.java   |  4 +-
 .../aws2sqs/CamelAws2sqsSourceConnectorConfig.java |  4 +-
 .../generated/resources/camel-netty-http-sink.json | 14 ++++++
 .../resources/camel-netty-http-source.json         | 14 ++++++
 .../camel-netty-http-kafka-sink-connector.adoc     |  4 +-
 .../camel-netty-http-kafka-source-connector.adoc   |  4 +-
 .../CamelNettyhttpSinkConnectorConfig.java         |  8 ++++
 .../CamelNettyhttpSourceConnectorConfig.java       |  8 ++++
 .../src/generated/resources/camel-netty-sink.json  | 14 ++++++
 .../generated/resources/camel-netty-source.json    | 14 ++++++
 .../docs/camel-netty-kafka-sink-connector.adoc     |  4 +-
 .../docs/camel-netty-kafka-source-connector.adoc   |  4 +-
 .../netty/CamelNettySinkConnectorConfig.java       |  8 ++++
 .../netty/CamelNettySourceConnectorConfig.java     |  8 ++++
 .../resources/camel-spring-rabbitmq-source.json    | 47 ++++++++++++++++++++
 ...mel-spring-rabbitmq-kafka-source-connector.adoc |  8 +++-
 .../CamelSpringrabbitmqSourceConnectorConfig.java  | 24 +++++++++++
 docs/modules/ROOT/nav.adoc                         | 50 ++++++++++++++++++++++
 docs/modules/ROOT/pages/connectors.adoc            | 24 ++++++++++-
 .../camel-aws2-s3-kafka-sink-connector.adoc        |  4 +-
 .../camel-aws2-s3-kafka-source-connector.adoc      |  4 +-
 .../camel-aws2-sns-kafka-sink-connector.adoc       |  4 +-
 .../camel-aws2-sqs-kafka-sink-connector.adoc       |  4 +-
 .../camel-aws2-sqs-kafka-source-connector.adoc     |  4 +-
 .../camel-netty-http-kafka-sink-connector.adoc     |  4 +-
 .../camel-netty-http-kafka-source-connector.adoc   |  4 +-
 .../camel-netty-kafka-sink-connector.adoc          |  4 +-
 .../camel-netty-kafka-source-connector.adoc        |  4 +-
 ...mel-spring-rabbitmq-kafka-source-connector.adoc |  8 +++-
 52 files changed, 423 insertions(+), 61 deletions(-)

diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json
index 9e183fa..e5674e6 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json
@@ -30,7 +30,7 @@
 		"camel.sink.endpoint.autoCreateBucket": {
 			"name": "camel.sink.endpoint.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -223,7 +223,7 @@
 		"camel.component.aws2-s3.autoCreateBucket": {
 			"name": "camel.component.aws2-s3.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json
index 8621e81..d1169e8 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json
@@ -30,7 +30,7 @@
 		"camel.source.endpoint.autoCreateBucket": {
 			"name": "camel.source.endpoint.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -386,7 +386,7 @@
 		"camel.component.aws2-s3.autoCreateBucket": {
 			"name": "camel.component.aws2-s3.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sns-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sns-sink.json
index 7b059d4..0ef5538 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sns-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sns-sink.json
@@ -24,7 +24,7 @@
 		"camel.sink.endpoint.autoCreateTopic": {
 			"name": "camel.sink.endpoint.autoCreateTopic",
 			"description": "Setting the autocreation of the topic",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -184,7 +184,7 @@
 		"camel.component.aws2-sns.autoCreateTopic": {
 			"name": "camel.component.aws2-sns.autoCreateTopic",
 			"description": "Setting the autocreation of the topic",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-sink.json
index 33feaa7..8272ee5 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-sink.json
@@ -31,7 +31,7 @@
 		"camel.sink.endpoint.autoCreateQueue": {
 			"name": "camel.sink.endpoint.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -235,7 +235,7 @@
 		"camel.component.aws2-sqs.autoCreateQueue": {
 			"name": "camel.component.aws2-sqs.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-source.json
index 22ad918..0a191ab 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-sqs-source.json
@@ -31,7 +31,7 @@
 		"camel.source.endpoint.autoCreateQueue": {
 			"name": "camel.source.endpoint.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -411,7 +411,7 @@
 		"camel.component.aws2-sqs.autoCreateQueue": {
 			"name": "camel.component.aws2-sqs.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-sink.json
index f65220a..fcf5581 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-sink.json
@@ -180,6 +180,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.hostnameVerification": {
+			"name": "camel.sink.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.allowSerializedHeaders": {
 			"name": "camel.sink.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -506,6 +513,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty-http.hostnameVerification": {
+			"name": "camel.component.netty-http.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty-http.allowSerializedHeaders": {
 			"name": "camel.component.netty-http.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-source.json
index a0e8a6e..6ac3af1 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-http-source.json
@@ -282,6 +282,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.source.endpoint.hostnameVerification": {
+			"name": "camel.source.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.allowSerializedHeaders": {
 			"name": "camel.source.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -683,6 +690,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty-http.hostnameVerification": {
+			"name": "camel.component.netty-http.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty-http.allowSerializedHeaders": {
 			"name": "camel.component.netty-http.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-sink.json
index 04d72ab..7bcdce9 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-sink.json
@@ -160,6 +160,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.hostnameVerification": {
+			"name": "camel.sink.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.allowSerializedHeaders": {
 			"name": "camel.sink.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -506,6 +513,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty.hostnameVerification": {
+			"name": "camel.component.netty.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty.allowSerializedHeaders": {
 			"name": "camel.component.netty.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-source.json
index 6a7e1ba..171e4aa 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-netty-source.json
@@ -221,6 +221,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.source.endpoint.hostnameVerification": {
+			"name": "camel.source.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.allowSerializedHeaders": {
 			"name": "camel.source.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -630,6 +637,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty.hostnameVerification": {
+			"name": "camel.component.netty.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty.allowSerializedHeaders": {
 			"name": "camel.component.netty.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-spring-rabbitmq-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-spring-rabbitmq-source.json
index adbe1f5..309945c 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-spring-rabbitmq-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-spring-rabbitmq-source.json
@@ -144,6 +144,12 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.source.endpoint.concurrentConsumers": {
+			"name": "camel.source.endpoint.concurrentConsumers",
+			"description": "The number of consumers",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.exceptionHandler": {
 			"name": "camel.source.endpoint.exceptionHandler",
 			"description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.",
@@ -161,6 +167,23 @@
 				"InOptionalOut"
 			]
 		},
+		"camel.source.endpoint.maxConcurrentConsumers": {
+			"name": "camel.source.endpoint.maxConcurrentConsumers",
+			"description": "The maximum number of consumers (available only with SMLC)",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
+		"camel.source.endpoint.messageListenerContainerType": {
+			"name": "camel.source.endpoint.messageListenerContainerType",
+			"description": "The type of the MessageListenerContainer One of: [DMLC] [SMLC]",
+			"defaultValue": "\"DMLC\"",
+			"priority": "MEDIUM",
+			"required": "false",
+			"enum": [
+				"DMLC",
+				"SMLC"
+			]
+		},
 		"camel.source.endpoint.prefetchCount": {
 			"name": "camel.source.endpoint.prefetchCount",
 			"description": "Tell the broker how many messages to send in a single request. Often this can be set quite high to improve throughput.",
@@ -263,6 +286,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.spring-rabbitmq.concurrentConsumers": {
+			"name": "camel.component.spring-rabbitmq.concurrentConsumers",
+			"description": "The number of consumers",
+			"defaultValue": "1",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.spring-rabbitmq.errorHandler": {
 			"name": "camel.component.spring-rabbitmq.errorHandler",
 			"description": "To use a custom ErrorHandler for handling exceptions from the message listener (consumer)",
@@ -275,6 +305,23 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.spring-rabbitmq.maxConcurrentConsumers": {
+			"name": "camel.component.spring-rabbitmq.maxConcurrentConsumers",
+			"description": "The maximum number of consumers (available only with SMLC)",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
+		"camel.component.spring-rabbitmq.messageListenerContainerType": {
+			"name": "camel.component.spring-rabbitmq.messageListenerContainerType",
+			"description": "The type of the MessageListenerContainer One of: [DMLC] [SMLC]",
+			"defaultValue": "\"DMLC\"",
+			"priority": "MEDIUM",
+			"required": "false",
+			"enum": [
+				"DMLC",
+				"SMLC"
+			]
+		},
 		"camel.component.spring-rabbitmq.prefetchCount": {
 			"name": "camel.component.spring-rabbitmq.prefetchCount",
 			"description": "Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.",
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json
index 9e183fa..e5674e6 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json
+++ b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json
@@ -30,7 +30,7 @@
 		"camel.sink.endpoint.autoCreateBucket": {
 			"name": "camel.sink.endpoint.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -223,7 +223,7 @@
 		"camel.component.aws2-s3.autoCreateBucket": {
 			"name": "camel.component.aws2-s3.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json
index 8621e81..d1169e8 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json
+++ b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json
@@ -30,7 +30,7 @@
 		"camel.source.endpoint.autoCreateBucket": {
 			"name": "camel.source.endpoint.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -386,7 +386,7 @@
 		"camel.component.aws2-s3.autoCreateBucket": {
 			"name": "camel.component.aws2-s3.autoCreateBucket",
 			"description": "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc
index 597aa87..1a99602 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-s3 sink connector supports 59 options, which are listed below.
 | *camel.sink.path.bucketNameOrArn* | Bucket name or ARN | null | true | HIGH
 | *camel.sink.endpoint.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.sink.endpoint.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.sink.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.sink.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.sink.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.sink.endpoint.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
 | *camel.sink.endpoint.policy* | The policy for this queue to set in the com.amazonaws.services.s3.AmazonS3#setBucketPolicy() method. | null | false | MEDIUM
@@ -62,7 +62,7 @@ The camel-aws2-s3 sink connector supports 59 options, which are listed below.
 | *camel.sink.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.component.aws2-s3.configuration* | The component configuration | null | false | MEDIUM
 | *camel.component.aws2-s3.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-s3.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc
index 909c149..81fb275 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-s3 source connector supports 85 options, which are listed below.
 | *camel.source.path.bucketNameOrArn* | Bucket name or ARN | null | true | HIGH
 | *camel.source.endpoint.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.source.endpoint.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.source.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.source.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.source.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.source.endpoint.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
 | *camel.source.endpoint.policy* | The policy for this queue to set in the com.amazonaws.services.s3.AmazonS3#setBucketPolicy() method. | null | false | MEDIUM
@@ -85,7 +85,7 @@ The camel-aws2-s3 source connector supports 85 options, which are listed below.
 | *camel.source.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.component.aws2-s3.configuration* | The component configuration | null | false | MEDIUM
 | *camel.component.aws2-s3.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-s3.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SinkConnectorConfig.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SinkConnectorConfig.java
index 86815fd..587fe04 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SinkConnectorConfig.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SinkConnectorConfig.java
@@ -35,7 +35,7 @@ public class CamelAws2s3SinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_AWS2S3_ENDPOINT_AMAZON_S3PRESIGNER_DEFAULT = null;
     public static final String CAMEL_SINK_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_CONF = "camel.sink.endpoint.autoCreateBucket";
     public static final String CAMEL_SINK_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_DOC = "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.";
-    public static final Boolean CAMEL_SINK_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_DEFAULT = true;
+    public static final Boolean CAMEL_SINK_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_DEFAULT = false;
     public static final String CAMEL_SINK_AWS2S3_ENDPOINT_OVERRIDE_ENDPOINT_CONF = "camel.sink.endpoint.overrideEndpoint";
     public static final String CAMEL_SINK_AWS2S3_ENDPOINT_OVERRIDE_ENDPOINT_DOC = "Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option";
     public static final Boolean CAMEL_SINK_AWS2S3_ENDPOINT_OVERRIDE_ENDPOINT_DEFAULT = false;
@@ -119,7 +119,7 @@ public class CamelAws2s3SinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_AWS2S3_COMPONENT_AMAZON_S3PRESIGNER_DEFAULT = null;
     public static final String CAMEL_SINK_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_CONF = "camel.component.aws2-s3.autoCreateBucket";
     public static final String CAMEL_SINK_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_DOC = "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.";
-    public static final Boolean CAMEL_SINK_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_DEFAULT = true;
+    public static final Boolean CAMEL_SINK_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_DEFAULT = false;
     public static final String CAMEL_SINK_AWS2S3_COMPONENT_CONFIGURATION_CONF = "camel.component.aws2-s3.configuration";
     public static final String CAMEL_SINK_AWS2S3_COMPONENT_CONFIGURATION_DOC = "The component configuration";
     public static final String CAMEL_SINK_AWS2S3_COMPONENT_CONFIGURATION_DEFAULT = null;
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java
index 5b1608a..6c4edc9 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java
@@ -37,7 +37,7 @@ public class CamelAws2s3SourceConnectorConfig
     public static final String CAMEL_SOURCE_AWS2S3_ENDPOINT_AMAZON_S3PRESIGNER_DEFAULT = null;
     public static final String CAMEL_SOURCE_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_CONF = "camel.source.endpoint.autoCreateBucket";
     public static final String CAMEL_SOURCE_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_DOC = "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.";
-    public static final Boolean CAMEL_SOURCE_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_DEFAULT = true;
+    public static final Boolean CAMEL_SOURCE_AWS2S3_ENDPOINT_AUTO_CREATE_BUCKET_DEFAULT = false;
     public static final String CAMEL_SOURCE_AWS2S3_ENDPOINT_OVERRIDE_ENDPOINT_CONF = "camel.source.endpoint.overrideEndpoint";
     public static final String CAMEL_SOURCE_AWS2S3_ENDPOINT_OVERRIDE_ENDPOINT_DOC = "Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option";
     public static final Boolean CAMEL_SOURCE_AWS2S3_ENDPOINT_OVERRIDE_ENDPOINT_DEFAULT = false;
@@ -190,7 +190,7 @@ public class CamelAws2s3SourceConnectorConfig
     public static final String CAMEL_SOURCE_AWS2S3_COMPONENT_AMAZON_S3PRESIGNER_DEFAULT = null;
     public static final String CAMEL_SOURCE_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_CONF = "camel.component.aws2-s3.autoCreateBucket";
     public static final String CAMEL_SOURCE_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_DOC = "Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already.";
-    public static final Boolean CAMEL_SOURCE_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_DEFAULT = true;
+    public static final Boolean CAMEL_SOURCE_AWS2S3_COMPONENT_AUTO_CREATE_BUCKET_DEFAULT = false;
     public static final String CAMEL_SOURCE_AWS2S3_COMPONENT_CONFIGURATION_CONF = "camel.component.aws2-s3.configuration";
     public static final String CAMEL_SOURCE_AWS2S3_COMPONENT_CONFIGURATION_DOC = "The component configuration";
     public static final String CAMEL_SOURCE_AWS2S3_COMPONENT_CONFIGURATION_DEFAULT = null;
diff --git a/connectors/camel-aws2-sns-kafka-connector/src/generated/resources/camel-aws2-sns-sink.json b/connectors/camel-aws2-sns-kafka-connector/src/generated/resources/camel-aws2-sns-sink.json
index 7b059d4..0ef5538 100644
--- a/connectors/camel-aws2-sns-kafka-connector/src/generated/resources/camel-aws2-sns-sink.json
+++ b/connectors/camel-aws2-sns-kafka-connector/src/generated/resources/camel-aws2-sns-sink.json
@@ -24,7 +24,7 @@
 		"camel.sink.endpoint.autoCreateTopic": {
 			"name": "camel.sink.endpoint.autoCreateTopic",
 			"description": "Setting the autocreation of the topic",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -184,7 +184,7 @@
 		"camel.component.aws2-sns.autoCreateTopic": {
 			"name": "camel.component.aws2-sns.autoCreateTopic",
 			"description": "Setting the autocreation of the topic",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/connectors/camel-aws2-sns-kafka-connector/src/main/docs/camel-aws2-sns-kafka-sink-connector.adoc b/connectors/camel-aws2-sns-kafka-connector/src/main/docs/camel-aws2-sns-kafka-sink-connector.adoc
index df40bba..f36dc00 100644
--- a/connectors/camel-aws2-sns-kafka-connector/src/main/docs/camel-aws2-sns-kafka-sink-connector.adoc
+++ b/connectors/camel-aws2-sns-kafka-connector/src/main/docs/camel-aws2-sns-kafka-sink-connector.adoc
@@ -33,7 +33,7 @@ The camel-aws2-sns sink connector supports 48 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.topicNameOrArn* | Topic name or ARN | null | true | HIGH
 | *camel.sink.endpoint.amazonSNSClient* | To use the AmazonSNS as the client | null | false | MEDIUM
-| *camel.sink.endpoint.autoCreateTopic* | Setting the autocreation of the topic | true | false | MEDIUM
+| *camel.sink.endpoint.autoCreateTopic* | Setting the autocreation of the topic | false | false | MEDIUM
 | *camel.sink.endpoint.headerFilterStrategy* | To use a custom HeaderFilterStrategy to map headers to/from Camel. | null | false | MEDIUM
 | *camel.sink.endpoint.kmsMasterKeyId* | The ID of an AWS-managed customer master key (CMK) for Amazon SNS or a custom CMK. | null | false | MEDIUM
 | *camel.sink.endpoint.lazyStartProducer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then cre [...]
@@ -56,7 +56,7 @@ The camel-aws2-sns sink connector supports 48 options, which are listed below.
 | *camel.sink.endpoint.accessKey* | Amazon AWS Access Key | null | false | MEDIUM
 | *camel.sink.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-sns.amazonSNSClient* | To use the AmazonSNS as the client | null | false | MEDIUM
-| *camel.component.aws2-sns.autoCreateTopic* | Setting the autocreation of the topic | true | false | MEDIUM
+| *camel.component.aws2-sns.autoCreateTopic* | Setting the autocreation of the topic | false | false | MEDIUM
 | *camel.component.aws2-sns.configuration* | Component configuration | null | false | MEDIUM
 | *camel.component.aws2-sns.kmsMasterKeyId* | The ID of an AWS-managed customer master key (CMK) for Amazon SNS or a custom CMK. | null | false | MEDIUM
 | *camel.component.aws2-sns.lazyStartProducer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed the [...]
diff --git a/connectors/camel-aws2-sns-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sns/CamelAws2snsSinkConnectorConfig.java b/connectors/camel-aws2-sns-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sns/CamelAws2snsSinkConnectorConfig.java
index c816c82..321de5b 100644
--- a/connectors/camel-aws2-sns-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sns/CamelAws2snsSinkConnectorConfig.java
+++ b/connectors/camel-aws2-sns-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sns/CamelAws2snsSinkConnectorConfig.java
@@ -32,7 +32,7 @@ public class CamelAws2snsSinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_AWS2SNS_ENDPOINT_AMAZON_SNSCLIENT_DEFAULT = null;
     public static final String CAMEL_SINK_AWS2SNS_ENDPOINT_AUTO_CREATE_TOPIC_CONF = "camel.sink.endpoint.autoCreateTopic";
     public static final String CAMEL_SINK_AWS2SNS_ENDPOINT_AUTO_CREATE_TOPIC_DOC = "Setting the autocreation of the topic";
-    public static final Boolean CAMEL_SINK_AWS2SNS_ENDPOINT_AUTO_CREATE_TOPIC_DEFAULT = true;
+    public static final Boolean CAMEL_SINK_AWS2SNS_ENDPOINT_AUTO_CREATE_TOPIC_DEFAULT = false;
     public static final String CAMEL_SINK_AWS2SNS_ENDPOINT_HEADER_FILTER_STRATEGY_CONF = "camel.sink.endpoint.headerFilterStrategy";
     public static final String CAMEL_SINK_AWS2SNS_ENDPOINT_HEADER_FILTER_STRATEGY_DOC = "To use a custom HeaderFilterStrategy to map headers to/from Camel.";
     public static final String CAMEL_SINK_AWS2SNS_ENDPOINT_HEADER_FILTER_STRATEGY_DEFAULT = null;
@@ -101,7 +101,7 @@ public class CamelAws2snsSinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_AWS2SNS_COMPONENT_AMAZON_SNSCLIENT_DEFAULT = null;
     public static final String CAMEL_SINK_AWS2SNS_COMPONENT_AUTO_CREATE_TOPIC_CONF = "camel.component.aws2-sns.autoCreateTopic";
     public static final String CAMEL_SINK_AWS2SNS_COMPONENT_AUTO_CREATE_TOPIC_DOC = "Setting the autocreation of the topic";
-    public static final Boolean CAMEL_SINK_AWS2SNS_COMPONENT_AUTO_CREATE_TOPIC_DEFAULT = true;
+    public static final Boolean CAMEL_SINK_AWS2SNS_COMPONENT_AUTO_CREATE_TOPIC_DEFAULT = false;
     public static final String CAMEL_SINK_AWS2SNS_COMPONENT_CONFIGURATION_CONF = "camel.component.aws2-sns.configuration";
     public static final String CAMEL_SINK_AWS2SNS_COMPONENT_CONFIGURATION_DOC = "Component configuration";
     public static final String CAMEL_SINK_AWS2SNS_COMPONENT_CONFIGURATION_DEFAULT = null;
diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-sink.json b/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-sink.json
index 33feaa7..8272ee5 100644
--- a/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-sink.json
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-sink.json
@@ -31,7 +31,7 @@
 		"camel.sink.endpoint.autoCreateQueue": {
 			"name": "camel.sink.endpoint.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -235,7 +235,7 @@
 		"camel.component.aws2-sqs.autoCreateQueue": {
 			"name": "camel.component.aws2-sqs.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-source.json b/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-source.json
index 22ad918..0a191ab 100644
--- a/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-source.json
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/generated/resources/camel-aws2-sqs-source.json
@@ -31,7 +31,7 @@
 		"camel.source.endpoint.autoCreateQueue": {
 			"name": "camel.source.endpoint.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
@@ -411,7 +411,7 @@
 		"camel.component.aws2-sqs.autoCreateQueue": {
 			"name": "camel.component.aws2-sqs.autoCreateQueue",
 			"description": "Setting the autocreation of the queue",
-			"defaultValue": "true",
+			"defaultValue": "false",
 			"priority": "MEDIUM",
 			"required": "false"
 		},
diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-sink-connector.adoc b/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-sink-connector.adoc
index 9f58eef..80d7a6f 100644
--- a/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-sink-connector.adoc
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-sink-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-sqs sink connector supports 60 options, which are listed below.
 | *camel.sink.path.queueNameOrArn* | Queue name or ARN | null | true | HIGH
 | *camel.sink.endpoint.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.sink.endpoint.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.sink.endpoint.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.sink.endpoint.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.sink.endpoint.headerFilterStrategy* | To use a custom HeaderFilterStrategy to map headers to/from Camel. | null | false | MEDIUM
 | *camel.sink.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.sink.endpoint.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
@@ -63,7 +63,7 @@ The camel-aws2-sqs sink connector supports 60 options, which are listed below.
 | *camel.sink.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-sqs.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.component.aws2-sqs.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.component.aws2-sqs.configuration* | The AWS SQS default configuration | null | false | MEDIUM
 | *camel.component.aws2-sqs.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-sqs.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-source-connector.adoc b/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-source-connector.adoc
index ba1b605..af08ebd 100644
--- a/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-source-connector.adoc
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/main/docs/camel-aws2-sqs-kafka-source-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-sqs source connector supports 93 options, which are listed below.
 | *camel.source.path.queueNameOrArn* | Queue name or ARN | null | true | HIGH
 | *camel.source.endpoint.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.source.endpoint.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.source.endpoint.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.source.endpoint.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.source.endpoint.headerFilterStrategy* | To use a custom HeaderFilterStrategy to map headers to/from Camel. | null | false | MEDIUM
 | *camel.source.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.source.endpoint.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
@@ -89,7 +89,7 @@ The camel-aws2-sqs source connector supports 93 options, which are listed below.
 | *camel.source.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-sqs.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.component.aws2-sqs.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.component.aws2-sqs.configuration* | The AWS SQS default configuration | null | false | MEDIUM
 | *camel.component.aws2-sqs.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-sqs.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSinkConnectorConfig.java b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSinkConnectorConfig.java
index 5a373ed..6c270be 100644
--- a/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSinkConnectorConfig.java
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSinkConnectorConfig.java
@@ -35,7 +35,7 @@ public class CamelAws2sqsSinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_AWS2SQS_ENDPOINT_AMAZON_SQSCLIENT_DEFAULT = null;
     public static final String CAMEL_SINK_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_CONF = "camel.sink.endpoint.autoCreateQueue";
     public static final String CAMEL_SINK_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_DOC = "Setting the autocreation of the queue";
-    public static final Boolean CAMEL_SINK_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_DEFAULT = true;
+    public static final Boolean CAMEL_SINK_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_DEFAULT = false;
     public static final String CAMEL_SINK_AWS2SQS_ENDPOINT_HEADER_FILTER_STRATEGY_CONF = "camel.sink.endpoint.headerFilterStrategy";
     public static final String CAMEL_SINK_AWS2SQS_ENDPOINT_HEADER_FILTER_STRATEGY_DOC = "To use a custom HeaderFilterStrategy to map headers to/from Camel.";
     public static final String CAMEL_SINK_AWS2SQS_ENDPOINT_HEADER_FILTER_STRATEGY_DEFAULT = null;
@@ -122,7 +122,7 @@ public class CamelAws2sqsSinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_AWS2SQS_COMPONENT_AMAZON_SQSCLIENT_DEFAULT = null;
     public static final String CAMEL_SINK_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_CONF = "camel.component.aws2-sqs.autoCreateQueue";
     public static final String CAMEL_SINK_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_DOC = "Setting the autocreation of the queue";
-    public static final Boolean CAMEL_SINK_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_DEFAULT = true;
+    public static final Boolean CAMEL_SINK_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_DEFAULT = false;
     public static final String CAMEL_SINK_AWS2SQS_COMPONENT_CONFIGURATION_CONF = "camel.component.aws2-sqs.configuration";
     public static final String CAMEL_SINK_AWS2SQS_COMPONENT_CONFIGURATION_DOC = "The AWS SQS default configuration";
     public static final String CAMEL_SINK_AWS2SQS_COMPONENT_CONFIGURATION_DEFAULT = null;
diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSourceConnectorConfig.java b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSourceConnectorConfig.java
index e23696c..74372df 100644
--- a/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSourceConnectorConfig.java
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/CamelAws2sqsSourceConnectorConfig.java
@@ -37,7 +37,7 @@ public class CamelAws2sqsSourceConnectorConfig
     public static final String CAMEL_SOURCE_AWS2SQS_ENDPOINT_AMAZON_SQSCLIENT_DEFAULT = null;
     public static final String CAMEL_SOURCE_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_CONF = "camel.source.endpoint.autoCreateQueue";
     public static final String CAMEL_SOURCE_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_DOC = "Setting the autocreation of the queue";
-    public static final Boolean CAMEL_SOURCE_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_DEFAULT = true;
+    public static final Boolean CAMEL_SOURCE_AWS2SQS_ENDPOINT_AUTO_CREATE_QUEUE_DEFAULT = false;
     public static final String CAMEL_SOURCE_AWS2SQS_ENDPOINT_HEADER_FILTER_STRATEGY_CONF = "camel.source.endpoint.headerFilterStrategy";
     public static final String CAMEL_SOURCE_AWS2SQS_ENDPOINT_HEADER_FILTER_STRATEGY_DOC = "To use a custom HeaderFilterStrategy to map headers to/from Camel.";
     public static final String CAMEL_SOURCE_AWS2SQS_ENDPOINT_HEADER_FILTER_STRATEGY_DEFAULT = null;
@@ -202,7 +202,7 @@ public class CamelAws2sqsSourceConnectorConfig
     public static final String CAMEL_SOURCE_AWS2SQS_COMPONENT_AMAZON_SQSCLIENT_DEFAULT = null;
     public static final String CAMEL_SOURCE_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_CONF = "camel.component.aws2-sqs.autoCreateQueue";
     public static final String CAMEL_SOURCE_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_DOC = "Setting the autocreation of the queue";
-    public static final Boolean CAMEL_SOURCE_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_DEFAULT = true;
+    public static final Boolean CAMEL_SOURCE_AWS2SQS_COMPONENT_AUTO_CREATE_QUEUE_DEFAULT = false;
     public static final String CAMEL_SOURCE_AWS2SQS_COMPONENT_CONFIGURATION_CONF = "camel.component.aws2-sqs.configuration";
     public static final String CAMEL_SOURCE_AWS2SQS_COMPONENT_CONFIGURATION_DOC = "The AWS SQS default configuration";
     public static final String CAMEL_SOURCE_AWS2SQS_COMPONENT_CONFIGURATION_DEFAULT = null;
diff --git a/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-sink.json b/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-sink.json
index f65220a..fcf5581 100644
--- a/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-sink.json
+++ b/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-sink.json
@@ -180,6 +180,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.hostnameVerification": {
+			"name": "camel.sink.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.allowSerializedHeaders": {
 			"name": "camel.sink.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -506,6 +513,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty-http.hostnameVerification": {
+			"name": "camel.component.netty-http.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty-http.allowSerializedHeaders": {
 			"name": "camel.component.netty-http.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-source.json b/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-source.json
index a0e8a6e..6ac3af1 100644
--- a/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-source.json
+++ b/connectors/camel-netty-http-kafka-connector/src/generated/resources/camel-netty-http-source.json
@@ -282,6 +282,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.source.endpoint.hostnameVerification": {
+			"name": "camel.source.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.allowSerializedHeaders": {
 			"name": "camel.source.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -683,6 +690,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty-http.hostnameVerification": {
+			"name": "camel.component.netty-http.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty-http.allowSerializedHeaders": {
 			"name": "camel.component.netty-http.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-sink-connector.adoc b/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-sink-connector.adoc
index b00f735..872422c 100644
--- a/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-sink-connector.adoc
+++ b/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSinkConn
 ----
 
 
-The camel-netty-http sink connector supports 111 options, which are listed below.
+The camel-netty-http sink connector supports 113 options, which are listed below.
 
 
 
@@ -56,6 +56,7 @@ The camel-netty-http sink connector supports 111 options, which are listed below
 | *camel.sink.endpoint.producerPoolMinEvictableIdle* | Sets the minimum amount of time (value in millis) an object may sit idle in the pool before it is eligible for eviction by the idle object evictor. | 300000L | false | MEDIUM
 | *camel.sink.endpoint.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.sink.endpoint.useRelativePath* | Sets whether to use a relative path in HTTP requests. | true | false | MEDIUM
+| *camel.sink.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.sink.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.sink.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.sink.endpoint.configuration* | To use a custom configured NettyHttpConfiguration for configuring this endpoint. | null | false | MEDIUM
@@ -106,6 +107,7 @@ The camel-netty-http sink connector supports 111 options, which are listed below
 | *camel.component.netty-http.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.component.netty-http.udpConnectionless Sending* | This option supports connection less udp sending which is a real fire and forget. A connected udp send receive the PortUnreachableException if no one is listen on the receiving port. | false | false | MEDIUM
 | *camel.component.netty-http.useByteBuf* | If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out. | false | false | MEDIUM
+| *camel.component.netty-http.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty-http.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty-http.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty-http.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-source-connector.adoc b/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-source-connector.adoc
index 9d9a9b3..5ea81c5 100644
--- a/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-source-connector.adoc
+++ b/connectors/camel-netty-http-kafka-connector/src/main/docs/camel-netty-http-kafka-source-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceCo
 ----
 
 
-The camel-netty-http source connector supports 131 options, which are listed below.
+The camel-netty-http source connector supports 133 options, which are listed below.
 
 
 
@@ -67,6 +67,7 @@ The camel-netty-http source connector supports 131 options, which are listed bel
 | *camel.source.endpoint.traceEnabled* | Specifies whether to enable HTTP TRACE for this Netty HTTP consumer. By default TRACE is turned off. | false | false | MEDIUM
 | *camel.source.endpoint.urlDecodeHeaders* | If this option is enabled, then during binding from Netty to Camel Message then the header values will be URL decoded (eg %20 will be a space character. Notice this option is used by the default org.apache.camel.component.netty.http.NettyHttpBinding and therefore if you implement a custom org.apache.camel.component.netty.http.NettyHttpBinding then you would need to decode the headers accordingly to this option. | false | false | MEDIUM
 | *camel.source.endpoint.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.source.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.source.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.source.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.source.endpoint.configuration* | To use a custom configured NettyHttpConfiguration for configuring this endpoint. | null | false | MEDIUM
@@ -125,6 +126,7 @@ The camel-netty-http source connector supports 131 options, which are listed bel
 | *camel.component.netty-http.serverExceptionCaught LogLevel* | If the server (NettyConsumer) catches an exception then its logged using this logging level. One of: [TRACE] [DEBUG] [INFO] [WARN] [ERROR] [OFF] | "WARN" | false | MEDIUM
 | *camel.component.netty-http.serverInitializer Factory* | To use a custom ServerInitializerFactory | null | false | MEDIUM
 | *camel.component.netty-http.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.component.netty-http.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty-http.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty-http.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty-http.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSinkConnectorConfig.java b/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSinkConnectorConfig.java
index 9c8dae1..9596363 100644
--- a/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSinkConnectorConfig.java
+++ b/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSinkConnectorConfig.java
@@ -101,6 +101,9 @@ public class CamelNettyhttpSinkConnectorConfig
     public static final String CAMEL_SINK_NETTYHTTP_ENDPOINT_USE_RELATIVE_PATH_CONF = "camel.sink.endpoint.useRelativePath";
     public static final String CAMEL_SINK_NETTYHTTP_ENDPOINT_USE_RELATIVE_PATH_DOC = "Sets whether to use a relative path in HTTP requests.";
     public static final Boolean CAMEL_SINK_NETTYHTTP_ENDPOINT_USE_RELATIVE_PATH_DEFAULT = true;
+    public static final String CAMEL_SINK_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_CONF = "camel.sink.endpoint.hostnameVerification";
+    public static final String CAMEL_SINK_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SINK_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SINK_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.sink.endpoint.allowSerializedHeaders";
     public static final String CAMEL_SINK_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SINK_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -251,6 +254,9 @@ public class CamelNettyhttpSinkConnectorConfig
     public static final String CAMEL_SINK_NETTYHTTP_COMPONENT_USE_BYTE_BUF_CONF = "camel.component.netty-http.useByteBuf";
     public static final String CAMEL_SINK_NETTYHTTP_COMPONENT_USE_BYTE_BUF_DOC = "If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out.";
     public static final Boolean CAMEL_SINK_NETTYHTTP_COMPONENT_USE_BYTE_BUF_DEFAULT = false;
+    public static final String CAMEL_SINK_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_CONF = "camel.component.netty-http.hostnameVerification";
+    public static final String CAMEL_SINK_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SINK_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SINK_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.component.netty-http.allowSerializedHeaders";
     public static final String CAMEL_SINK_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SINK_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -397,6 +403,7 @@ public class CamelNettyhttpSinkConnectorConfig
         conf.define(CAMEL_SINK_NETTYHTTP_ENDPOINT_PRODUCER_POOL_MIN_EVICTABLE_IDLE_CONF, ConfigDef.Type.LONG, CAMEL_SINK_NETTYHTTP_ENDPOINT_PRODUCER_POOL_MIN_EVICTABLE_IDLE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_ENDPOINT_PRODUCER_POOL_MIN_EVICTABLE_IDLE_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_ENDPOINT_PRODUCER_POOL_MIN_IDLE_CONF, ConfigDef.Type.INT, CAMEL_SINK_NETTYHTTP_ENDPOINT_PRODUCER_POOL_MIN_IDLE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_ENDPOINT_PRODUCER_POOL_MIN_IDLE_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_ENDPOINT_USE_RELATIVE_PATH_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_ENDPOINT_USE_RELATIVE_PATH_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_ENDPOINT_USE_RELATIVE_PATH_DOC);
+        conf.define(CAMEL_SINK_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_ENDPOINT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SINK_NETTYHTTP_ENDPOINT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_ENDPOINT_CHANNEL_GROUP_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_ENDPOINT_CONFIGURATION_CONF, ConfigDef.Type.STRING, CAMEL_SINK_NETTYHTTP_ENDPOINT_CONFIGURATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_ENDPOINT_CONFIGURATION_DOC);
@@ -447,6 +454,7 @@ public class CamelNettyhttpSinkConnectorConfig
         conf.define(CAMEL_SINK_NETTYHTTP_COMPONENT_PRODUCER_POOL_MIN_IDLE_CONF, ConfigDef.Type.INT, CAMEL_SINK_NETTYHTTP_COMPONENT_PRODUCER_POOL_MIN_IDLE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_COMPONENT_PRODUCER_POOL_MIN_IDLE_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_COMPONENT_UDP_CONNECTIONLESS_SENDING_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_COMPONENT_UDP_CONNECTIONLESS_SENDING_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_COMPONENT_UDP_CONNECTIONLESS_SENDING_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_COMPONENT_USE_BYTE_BUF_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_COMPONENT_USE_BYTE_BUF_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_COMPONENT_USE_BYTE_BUF_DOC);
+        conf.define(CAMEL_SINK_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_COMPONENT_AUTOWIRED_ENABLED_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTYHTTP_COMPONENT_AUTOWIRED_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_COMPONENT_AUTOWIRED_ENABLED_DOC);
         conf.define(CAMEL_SINK_NETTYHTTP_COMPONENT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SINK_NETTYHTTP_COMPONENT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTYHTTP_COMPONENT_CHANNEL_GROUP_DOC);
diff --git a/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSourceConnectorConfig.java b/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSourceConnectorConfig.java
index 1da1aa7..d75bf26 100644
--- a/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSourceConnectorConfig.java
+++ b/connectors/camel-netty-http-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/nettyhttp/CamelNettyhttpSourceConnectorConfig.java
@@ -134,6 +134,9 @@ public class CamelNettyhttpSourceConnectorConfig
     public static final String CAMEL_SOURCE_NETTYHTTP_ENDPOINT_USING_EXECUTOR_SERVICE_CONF = "camel.source.endpoint.usingExecutorService";
     public static final String CAMEL_SOURCE_NETTYHTTP_ENDPOINT_USING_EXECUTOR_SERVICE_DOC = "Whether to use ordered thread pool, to ensure events are processed orderly on the same channel.";
     public static final Boolean CAMEL_SOURCE_NETTYHTTP_ENDPOINT_USING_EXECUTOR_SERVICE_DEFAULT = true;
+    public static final String CAMEL_SOURCE_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_CONF = "camel.source.endpoint.hostnameVerification";
+    public static final String CAMEL_SOURCE_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SOURCE_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SOURCE_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.source.endpoint.allowSerializedHeaders";
     public static final String CAMEL_SOURCE_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SOURCE_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -308,6 +311,9 @@ public class CamelNettyhttpSourceConnectorConfig
     public static final String CAMEL_SOURCE_NETTYHTTP_COMPONENT_USING_EXECUTOR_SERVICE_CONF = "camel.component.netty-http.usingExecutorService";
     public static final String CAMEL_SOURCE_NETTYHTTP_COMPONENT_USING_EXECUTOR_SERVICE_DOC = "Whether to use ordered thread pool, to ensure events are processed orderly on the same channel.";
     public static final Boolean CAMEL_SOURCE_NETTYHTTP_COMPONENT_USING_EXECUTOR_SERVICE_DEFAULT = true;
+    public static final String CAMEL_SOURCE_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_CONF = "camel.component.netty-http.hostnameVerification";
+    public static final String CAMEL_SOURCE_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SOURCE_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SOURCE_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.component.netty-http.allowSerializedHeaders";
     public static final String CAMEL_SOURCE_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SOURCE_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -468,6 +474,7 @@ public class CamelNettyhttpSourceConnectorConfig
         conf.define(CAMEL_SOURCE_NETTYHTTP_ENDPOINT_TRACE_ENABLED_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_TRACE_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_TRACE_ENABLED_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_ENDPOINT_URL_DECODE_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_URL_DECODE_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_URL_DECODE_HEADERS_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_ENDPOINT_USING_EXECUTOR_SERVICE_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_USING_EXECUTOR_SERVICE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_USING_EXECUTOR_SERVICE_DOC);
+        conf.define(CAMEL_SOURCE_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_ENDPOINT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_CHANNEL_GROUP_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_ENDPOINT_CONFIGURATION_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_CONFIGURATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_ENDPOINT_CONFIGURATION_DOC);
@@ -526,6 +533,7 @@ public class CamelNettyhttpSourceConnectorConfig
         conf.define(CAMEL_SOURCE_NETTYHTTP_COMPONENT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTYHTTP_COMPONENT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_COMPONENT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_COMPONENT_SERVER_INITIALIZER_FACTORY_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTYHTTP_COMPONENT_SERVER_INITIALIZER_FACTORY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_COMPONENT_SERVER_INITIALIZER_FACTORY_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_COMPONENT_USING_EXECUTOR_SERVICE_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_COMPONENT_USING_EXECUTOR_SERVICE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_COMPONENT_USING_EXECUTOR_SERVICE_DOC);
+        conf.define(CAMEL_SOURCE_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_COMPONENT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_COMPONENT_AUTOWIRED_ENABLED_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTYHTTP_COMPONENT_AUTOWIRED_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_COMPONENT_AUTOWIRED_ENABLED_DOC);
         conf.define(CAMEL_SOURCE_NETTYHTTP_COMPONENT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTYHTTP_COMPONENT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTYHTTP_COMPONENT_CHANNEL_GROUP_DOC);
diff --git a/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-sink.json b/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-sink.json
index 04d72ab..7bcdce9 100644
--- a/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-sink.json
+++ b/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-sink.json
@@ -160,6 +160,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.sink.endpoint.hostnameVerification": {
+			"name": "camel.sink.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.sink.endpoint.allowSerializedHeaders": {
 			"name": "camel.sink.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -506,6 +513,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty.hostnameVerification": {
+			"name": "camel.component.netty.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty.allowSerializedHeaders": {
 			"name": "camel.component.netty.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-source.json b/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-source.json
index 6a7e1ba..171e4aa 100644
--- a/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-source.json
+++ b/connectors/camel-netty-kafka-connector/src/generated/resources/camel-netty-source.json
@@ -221,6 +221,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.source.endpoint.hostnameVerification": {
+			"name": "camel.source.endpoint.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.allowSerializedHeaders": {
 			"name": "camel.source.endpoint.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
@@ -630,6 +637,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.netty.hostnameVerification": {
+			"name": "camel.component.netty.hostnameVerification",
+			"description": "To enable\/disable hostname verification on SSLEngine",
+			"defaultValue": "false",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.netty.allowSerializedHeaders": {
 			"name": "camel.component.netty.allowSerializedHeaders",
 			"description": "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.",
diff --git a/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-sink-connector.adoc b/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-sink-connector.adoc
index 39d68f8..3cd0dc3 100644
--- a/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-sink-connector.adoc
+++ b/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.netty.CamelNettySinkConnector
 ----
 
 
-The camel-netty sink connector supports 107 options, which are listed below.
+The camel-netty sink connector supports 109 options, which are listed below.
 
 
 
@@ -53,6 +53,7 @@ The camel-netty sink connector supports 107 options, which are listed below.
 | *camel.sink.endpoint.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.sink.endpoint.udpConnectionlessSending* | This option supports connection less udp sending which is a real fire and forget. A connected udp send receive the PortUnreachableException if no one is listen on the receiving port. | false | false | MEDIUM
 | *camel.sink.endpoint.useByteBuf* | If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out. | false | false | MEDIUM
+| *camel.sink.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.sink.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.sink.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.sink.endpoint.nativeTransport* | Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms. You need to add the netty JAR for the host operating system you are using. See more details at: \http://netty.io/wiki/native-transports.html | false | false | MEDIUM
@@ -105,6 +106,7 @@ The camel-netty sink connector supports 107 options, which are listed below.
 | *camel.component.netty.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.component.netty.udpConnectionlessSending* | This option supports connection less udp sending which is a real fire and forget. A connected udp send receive the PortUnreachableException if no one is listen on the receiving port. | false | false | MEDIUM
 | *camel.component.netty.useByteBuf* | If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out. | false | false | MEDIUM
+| *camel.component.netty.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-source-connector.adoc b/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-source-connector.adoc
index ad91475..07a8199 100644
--- a/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-source-connector.adoc
+++ b/connectors/camel-netty-kafka-connector/src/main/docs/camel-netty-kafka-source-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.netty.CamelNettySourceConnector
 ----
 
 
-The camel-netty source connector supports 119 options, which are listed below.
+The camel-netty source connector supports 121 options, which are listed below.
 
 
 
@@ -58,6 +58,7 @@ The camel-netty source connector supports 119 options, which are listed below.
 | *camel.source.endpoint.serverExceptionCaughtLog Level* | If the server (NettyConsumer) catches an exception then its logged using this logging level. One of: [TRACE] [DEBUG] [INFO] [WARN] [ERROR] [OFF] | "WARN" | false | MEDIUM
 | *camel.source.endpoint.serverInitializerFactory* | To use a custom ServerInitializerFactory | null | false | MEDIUM
 | *camel.source.endpoint.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.source.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.source.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.source.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.source.endpoint.nativeTransport* | Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms. You need to add the netty JAR for the host operating system you are using. See more details at: \http://netty.io/wiki/native-transports.html | false | false | MEDIUM
@@ -116,6 +117,7 @@ The camel-netty source connector supports 119 options, which are listed below.
 | *camel.component.netty.serverExceptionCaughtLog Level* | If the server (NettyConsumer) catches an exception then its logged using this logging level. One of: [TRACE] [DEBUG] [INFO] [WARN] [ERROR] [OFF] | "WARN" | false | MEDIUM
 | *camel.component.netty.serverInitializerFactory* | To use a custom ServerInitializerFactory | null | false | MEDIUM
 | *camel.component.netty.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.component.netty.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySinkConnectorConfig.java b/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySinkConnectorConfig.java
index 1cc2d98..1575e80 100644
--- a/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySinkConnectorConfig.java
+++ b/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySinkConnectorConfig.java
@@ -90,6 +90,9 @@ public class CamelNettySinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_NETTY_ENDPOINT_USE_BYTE_BUF_CONF = "camel.sink.endpoint.useByteBuf";
     public static final String CAMEL_SINK_NETTY_ENDPOINT_USE_BYTE_BUF_DOC = "If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out.";
     public static final Boolean CAMEL_SINK_NETTY_ENDPOINT_USE_BYTE_BUF_DEFAULT = false;
+    public static final String CAMEL_SINK_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_CONF = "camel.sink.endpoint.hostnameVerification";
+    public static final String CAMEL_SINK_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SINK_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SINK_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.sink.endpoint.allowSerializedHeaders";
     public static final String CAMEL_SINK_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SINK_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -246,6 +249,9 @@ public class CamelNettySinkConnectorConfig extends CamelSinkConnectorConfig {
     public static final String CAMEL_SINK_NETTY_COMPONENT_USE_BYTE_BUF_CONF = "camel.component.netty.useByteBuf";
     public static final String CAMEL_SINK_NETTY_COMPONENT_USE_BYTE_BUF_DOC = "If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out.";
     public static final Boolean CAMEL_SINK_NETTY_COMPONENT_USE_BYTE_BUF_DEFAULT = false;
+    public static final String CAMEL_SINK_NETTY_COMPONENT_HOSTNAME_VERIFICATION_CONF = "camel.component.netty.hostnameVerification";
+    public static final String CAMEL_SINK_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SINK_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SINK_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.component.netty.allowSerializedHeaders";
     public static final String CAMEL_SINK_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SINK_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -380,6 +386,7 @@ public class CamelNettySinkConnectorConfig extends CamelSinkConnectorConfig {
         conf.define(CAMEL_SINK_NETTY_ENDPOINT_PRODUCER_POOL_MIN_IDLE_CONF, ConfigDef.Type.INT, CAMEL_SINK_NETTY_ENDPOINT_PRODUCER_POOL_MIN_IDLE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_ENDPOINT_PRODUCER_POOL_MIN_IDLE_DOC);
         conf.define(CAMEL_SINK_NETTY_ENDPOINT_UDP_CONNECTIONLESS_SENDING_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_ENDPOINT_UDP_CONNECTIONLESS_SENDING_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_ENDPOINT_UDP_CONNECTIONLESS_SENDING_DOC);
         conf.define(CAMEL_SINK_NETTY_ENDPOINT_USE_BYTE_BUF_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_ENDPOINT_USE_BYTE_BUF_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_ENDPOINT_USE_BYTE_BUF_DOC);
+        conf.define(CAMEL_SINK_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SINK_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SINK_NETTY_ENDPOINT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SINK_NETTY_ENDPOINT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_ENDPOINT_CHANNEL_GROUP_DOC);
         conf.define(CAMEL_SINK_NETTY_ENDPOINT_NATIVE_TRANSPORT_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_ENDPOINT_NATIVE_TRANSPORT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_ENDPOINT_NATIVE_TRANSPORT_DOC);
@@ -432,6 +439,7 @@ public class CamelNettySinkConnectorConfig extends CamelSinkConnectorConfig {
         conf.define(CAMEL_SINK_NETTY_COMPONENT_PRODUCER_POOL_MIN_IDLE_CONF, ConfigDef.Type.INT, CAMEL_SINK_NETTY_COMPONENT_PRODUCER_POOL_MIN_IDLE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_COMPONENT_PRODUCER_POOL_MIN_IDLE_DOC);
         conf.define(CAMEL_SINK_NETTY_COMPONENT_UDP_CONNECTIONLESS_SENDING_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_COMPONENT_UDP_CONNECTIONLESS_SENDING_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_COMPONENT_UDP_CONNECTIONLESS_SENDING_DOC);
         conf.define(CAMEL_SINK_NETTY_COMPONENT_USE_BYTE_BUF_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_COMPONENT_USE_BYTE_BUF_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_COMPONENT_USE_BYTE_BUF_DOC);
+        conf.define(CAMEL_SINK_NETTY_COMPONENT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SINK_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SINK_NETTY_COMPONENT_AUTOWIRED_ENABLED_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_NETTY_COMPONENT_AUTOWIRED_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_COMPONENT_AUTOWIRED_ENABLED_DOC);
         conf.define(CAMEL_SINK_NETTY_COMPONENT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SINK_NETTY_COMPONENT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_NETTY_COMPONENT_CHANNEL_GROUP_DOC);
diff --git a/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySourceConnectorConfig.java b/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySourceConnectorConfig.java
index cb72ef4..bcb1a1c 100644
--- a/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySourceConnectorConfig.java
+++ b/connectors/camel-netty-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/netty/CamelNettySourceConnectorConfig.java
@@ -107,6 +107,9 @@ public class CamelNettySourceConnectorConfig
     public static final String CAMEL_SOURCE_NETTY_ENDPOINT_USING_EXECUTOR_SERVICE_CONF = "camel.source.endpoint.usingExecutorService";
     public static final String CAMEL_SOURCE_NETTY_ENDPOINT_USING_EXECUTOR_SERVICE_DOC = "Whether to use ordered thread pool, to ensure events are processed orderly on the same channel.";
     public static final Boolean CAMEL_SOURCE_NETTY_ENDPOINT_USING_EXECUTOR_SERVICE_DEFAULT = true;
+    public static final String CAMEL_SOURCE_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_CONF = "camel.source.endpoint.hostnameVerification";
+    public static final String CAMEL_SOURCE_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SOURCE_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SOURCE_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.source.endpoint.allowSerializedHeaders";
     public static final String CAMEL_SOURCE_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SOURCE_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -281,6 +284,9 @@ public class CamelNettySourceConnectorConfig
     public static final String CAMEL_SOURCE_NETTY_COMPONENT_USING_EXECUTOR_SERVICE_CONF = "camel.component.netty.usingExecutorService";
     public static final String CAMEL_SOURCE_NETTY_COMPONENT_USING_EXECUTOR_SERVICE_DOC = "Whether to use ordered thread pool, to ensure events are processed orderly on the same channel.";
     public static final Boolean CAMEL_SOURCE_NETTY_COMPONENT_USING_EXECUTOR_SERVICE_DEFAULT = true;
+    public static final String CAMEL_SOURCE_NETTY_COMPONENT_HOSTNAME_VERIFICATION_CONF = "camel.component.netty.hostnameVerification";
+    public static final String CAMEL_SOURCE_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DOC = "To enable/disable hostname verification on SSLEngine";
+    public static final Boolean CAMEL_SOURCE_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT = false;
     public static final String CAMEL_SOURCE_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF = "camel.component.netty.allowSerializedHeaders";
     public static final String CAMEL_SOURCE_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC = "Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level.";
     public static final Boolean CAMEL_SOURCE_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT = false;
@@ -423,6 +429,7 @@ public class CamelNettySourceConnectorConfig
         conf.define(CAMEL_SOURCE_NETTY_ENDPOINT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTY_ENDPOINT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_ENDPOINT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_DOC);
         conf.define(CAMEL_SOURCE_NETTY_ENDPOINT_SERVER_INITIALIZER_FACTORY_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTY_ENDPOINT_SERVER_INITIALIZER_FACTORY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_ENDPOINT_SERVER_INITIALIZER_FACTORY_DOC);
         conf.define(CAMEL_SOURCE_NETTY_ENDPOINT_USING_EXECUTOR_SERVICE_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_ENDPOINT_USING_EXECUTOR_SERVICE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_ENDPOINT_USING_EXECUTOR_SERVICE_DOC);
+        conf.define(CAMEL_SOURCE_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_ENDPOINT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SOURCE_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_ENDPOINT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SOURCE_NETTY_ENDPOINT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTY_ENDPOINT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_ENDPOINT_CHANNEL_GROUP_DOC);
         conf.define(CAMEL_SOURCE_NETTY_ENDPOINT_NATIVE_TRANSPORT_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_ENDPOINT_NATIVE_TRANSPORT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_ENDPOINT_NATIVE_TRANSPORT_DOC);
@@ -481,6 +488,7 @@ public class CamelNettySourceConnectorConfig
         conf.define(CAMEL_SOURCE_NETTY_COMPONENT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTY_COMPONENT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_COMPONENT_SERVER_EXCEPTION_CAUGHT_LOG_LEVEL_DOC);
         conf.define(CAMEL_SOURCE_NETTY_COMPONENT_SERVER_INITIALIZER_FACTORY_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTY_COMPONENT_SERVER_INITIALIZER_FACTORY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_COMPONENT_SERVER_INITIALIZER_FACTORY_DOC);
         conf.define(CAMEL_SOURCE_NETTY_COMPONENT_USING_EXECUTOR_SERVICE_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_COMPONENT_USING_EXECUTOR_SERVICE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_COMPONENT_USING_EXECUTOR_SERVICE_DOC);
+        conf.define(CAMEL_SOURCE_NETTY_COMPONENT_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_COMPONENT_HOSTNAME_VERIFICATION_DOC);
         conf.define(CAMEL_SOURCE_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_COMPONENT_ALLOW_SERIALIZED_HEADERS_DOC);
         conf.define(CAMEL_SOURCE_NETTY_COMPONENT_AUTOWIRED_ENABLED_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_NETTY_COMPONENT_AUTOWIRED_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_COMPONENT_AUTOWIRED_ENABLED_DOC);
         conf.define(CAMEL_SOURCE_NETTY_COMPONENT_CHANNEL_GROUP_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_NETTY_COMPONENT_CHANNEL_GROUP_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_NETTY_COMPONENT_CHANNEL_GROUP_DOC);
diff --git a/connectors/camel-spring-rabbitmq-kafka-connector/src/generated/resources/camel-spring-rabbitmq-source.json b/connectors/camel-spring-rabbitmq-kafka-connector/src/generated/resources/camel-spring-rabbitmq-source.json
index adbe1f5..309945c 100644
--- a/connectors/camel-spring-rabbitmq-kafka-connector/src/generated/resources/camel-spring-rabbitmq-source.json
+++ b/connectors/camel-spring-rabbitmq-kafka-connector/src/generated/resources/camel-spring-rabbitmq-source.json
@@ -144,6 +144,12 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.source.endpoint.concurrentConsumers": {
+			"name": "camel.source.endpoint.concurrentConsumers",
+			"description": "The number of consumers",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.source.endpoint.exceptionHandler": {
 			"name": "camel.source.endpoint.exceptionHandler",
 			"description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.",
@@ -161,6 +167,23 @@
 				"InOptionalOut"
 			]
 		},
+		"camel.source.endpoint.maxConcurrentConsumers": {
+			"name": "camel.source.endpoint.maxConcurrentConsumers",
+			"description": "The maximum number of consumers (available only with SMLC)",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
+		"camel.source.endpoint.messageListenerContainerType": {
+			"name": "camel.source.endpoint.messageListenerContainerType",
+			"description": "The type of the MessageListenerContainer One of: [DMLC] [SMLC]",
+			"defaultValue": "\"DMLC\"",
+			"priority": "MEDIUM",
+			"required": "false",
+			"enum": [
+				"DMLC",
+				"SMLC"
+			]
+		},
 		"camel.source.endpoint.prefetchCount": {
 			"name": "camel.source.endpoint.prefetchCount",
 			"description": "Tell the broker how many messages to send in a single request. Often this can be set quite high to improve throughput.",
@@ -263,6 +286,13 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.spring-rabbitmq.concurrentConsumers": {
+			"name": "camel.component.spring-rabbitmq.concurrentConsumers",
+			"description": "The number of consumers",
+			"defaultValue": "1",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
 		"camel.component.spring-rabbitmq.errorHandler": {
 			"name": "camel.component.spring-rabbitmq.errorHandler",
 			"description": "To use a custom ErrorHandler for handling exceptions from the message listener (consumer)",
@@ -275,6 +305,23 @@
 			"priority": "MEDIUM",
 			"required": "false"
 		},
+		"camel.component.spring-rabbitmq.maxConcurrentConsumers": {
+			"name": "camel.component.spring-rabbitmq.maxConcurrentConsumers",
+			"description": "The maximum number of consumers (available only with SMLC)",
+			"priority": "MEDIUM",
+			"required": "false"
+		},
+		"camel.component.spring-rabbitmq.messageListenerContainerType": {
+			"name": "camel.component.spring-rabbitmq.messageListenerContainerType",
+			"description": "The type of the MessageListenerContainer One of: [DMLC] [SMLC]",
+			"defaultValue": "\"DMLC\"",
+			"priority": "MEDIUM",
+			"required": "false",
+			"enum": [
+				"DMLC",
+				"SMLC"
+			]
+		},
 		"camel.component.spring-rabbitmq.prefetchCount": {
 			"name": "camel.component.spring-rabbitmq.prefetchCount",
 			"description": "Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.",
diff --git a/connectors/camel-spring-rabbitmq-kafka-connector/src/main/docs/camel-spring-rabbitmq-kafka-source-connector.adoc b/connectors/camel-spring-rabbitmq-kafka-connector/src/main/docs/camel-spring-rabbitmq-kafka-source-connector.adoc
index c3dcc6a..467a171 100644
--- a/connectors/camel-spring-rabbitmq-kafka-connector/src/main/docs/camel-spring-rabbitmq-kafka-source-connector.adoc
+++ b/connectors/camel-spring-rabbitmq-kafka-connector/src/main/docs/camel-spring-rabbitmq-kafka-source-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.springrabbitmq.CamelSpringrabbit
 ----
 
 
-The camel-spring-rabbitmq source connector supports 44 options, which are listed below.
+The camel-spring-rabbitmq source connector supports 50 options, which are listed below.
 
 
 
@@ -49,8 +49,11 @@ The camel-spring-rabbitmq source connector supports 44 options, which are listed
 | *camel.source.endpoint.exclusive* | Set to true for an exclusive consumer | false | false | MEDIUM
 | *camel.source.endpoint.noLocal* | Set to true for an no-local consumer | false | false | MEDIUM
 | *camel.source.endpoint.queues* | The queue(s) to use for consuming messages. Multiple queue names can be separated by comma. If none has been configured then Camel will generate an unique id as the queue name for the consumer. | null | false | MEDIUM
+| *camel.source.endpoint.concurrentConsumers* | The number of consumers | null | false | MEDIUM
 | *camel.source.endpoint.exceptionHandler* | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | null | false | MEDIUM
 | *camel.source.endpoint.exchangePattern* | Sets the exchange pattern when the consumer creates an exchange. One of: [InOnly] [InOut] [InOptionalOut] | null | false | MEDIUM
+| *camel.source.endpoint.maxConcurrentConsumers* | The maximum number of consumers (available only with SMLC) | null | false | MEDIUM
+| *camel.source.endpoint.messageListenerContainerType* | The type of the MessageListenerContainer One of: [DMLC] [SMLC] | "DMLC" | false | MEDIUM
 | *camel.source.endpoint.prefetchCount* | Tell the broker how many messages to send in a single request. Often this can be set quite high to improve throughput. | null | false | MEDIUM
 | *camel.source.endpoint.args* | Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.consumer. arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dlq.queue. arg.dlq.binding. For example to declare a queue with message ttl argument: args=arg.queue.x-message-ttl=60000 | null | false | MEDIUM
 | *camel.source.endpoint.messageConverter* | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | null | false | MEDIUM
@@ -66,8 +69,11 @@ The camel-spring-rabbitmq source connector supports 44 options, which are listed
 | *camel.component.spring-rabbitmq.deadLetterExchange Type* | The type of the dead letter exchange One of: [direct] [fanout] [headers] [topic] | "direct" | false | MEDIUM
 | *camel.component.spring-rabbitmq.deadLetterQueue* | The name of the dead letter queue | null | false | MEDIUM
 | *camel.component.spring-rabbitmq.deadLetterRouting Key* | The routing key for the dead letter exchange | null | false | MEDIUM
+| *camel.component.spring-rabbitmq.concurrent Consumers* | The number of consumers | 1 | false | MEDIUM
 | *camel.component.spring-rabbitmq.errorHandler* | To use a custom ErrorHandler for handling exceptions from the message listener (consumer) | null | false | MEDIUM
 | *camel.component.spring-rabbitmq.listenerContainer Factory* | To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages | null | false | MEDIUM
+| *camel.component.spring-rabbitmq.maxConcurrent Consumers* | The maximum number of consumers (available only with SMLC) | null | false | MEDIUM
+| *camel.component.spring-rabbitmq.messageListener ContainerType* | The type of the MessageListenerContainer One of: [DMLC] [SMLC] | "DMLC" | false | MEDIUM
 | *camel.component.spring-rabbitmq.prefetchCount* | Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. | 250 | false | MEDIUM
 | *camel.component.spring-rabbitmq.shutdownTimeout* | The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. | 5000L | false | MEDIUM
 | *camel.component.spring-rabbitmq.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
diff --git a/connectors/camel-spring-rabbitmq-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/springrabbitmq/CamelSpringrabbitmqSourceConnectorConfig.java b/connectors/camel-spring-rabbitmq-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/springrabbitmq/CamelSpringrabbitmqSourceConnectorConfig.java
index 7fd9c41..d6cb6c9 100644
--- a/connectors/camel-spring-rabbitmq-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/springrabbitmq/CamelSpringrabbitmqSourceConnectorConfig.java
+++ b/connectors/camel-spring-rabbitmq-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/springrabbitmq/CamelSpringrabbitmqSourceConnectorConfig.java
@@ -80,12 +80,21 @@ public class CamelSpringrabbitmqSourceConnectorConfig
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_QUEUES_CONF = "camel.source.endpoint.queues";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_QUEUES_DOC = "The queue(s) to use for consuming messages. Multiple queue names can be separated by comma. If none has been configured then Camel will generate an unique id as the queue name for the consumer.";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_QUEUES_DEFAULT = null;
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_CONCURRENT_CONSUMERS_CONF = "camel.source.endpoint.concurrentConsumers";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_CONCURRENT_CONSUMERS_DOC = "The number of consumers";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_CONCURRENT_CONSUMERS_DEFAULT = null;
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCEPTION_HANDLER_CONF = "camel.source.endpoint.exceptionHandler";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCEPTION_HANDLER_DOC = "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCEPTION_HANDLER_DEFAULT = null;
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCHANGE_PATTERN_CONF = "camel.source.endpoint.exchangePattern";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCHANGE_PATTERN_DOC = "Sets the exchange pattern when the consumer creates an exchange. One of: [InOnly] [InOut] [InOptionalOut]";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCHANGE_PATTERN_DEFAULT = null;
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MAX_CONCURRENT_CONSUMERS_CONF = "camel.source.endpoint.maxConcurrentConsumers";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MAX_CONCURRENT_CONSUMERS_DOC = "The maximum number of consumers (available only with SMLC)";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MAX_CONCURRENT_CONSUMERS_DEFAULT = null;
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_LISTENER_CONTAINER_TYPE_CONF = "camel.source.endpoint.messageListenerContainerType";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_LISTENER_CONTAINER_TYPE_DOC = "The type of the MessageListenerContainer One of: [DMLC] [SMLC]";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_LISTENER_CONTAINER_TYPE_DEFAULT = "DMLC";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_PREFETCH_COUNT_CONF = "camel.source.endpoint.prefetchCount";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_PREFETCH_COUNT_DOC = "Tell the broker how many messages to send in a single request. Often this can be set quite high to improve throughput.";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_PREFETCH_COUNT_DEFAULT = null;
@@ -131,12 +140,21 @@ public class CamelSpringrabbitmqSourceConnectorConfig
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_ROUTING_KEY_CONF = "camel.component.spring-rabbitmq.deadLetterRoutingKey";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_ROUTING_KEY_DOC = "The routing key for the dead letter exchange";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_ROUTING_KEY_DEFAULT = null;
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_CONCURRENT_CONSUMERS_CONF = "camel.component.spring-rabbitmq.concurrentConsumers";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_CONCURRENT_CONSUMERS_DOC = "The number of consumers";
+    public static final Integer CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_CONCURRENT_CONSUMERS_DEFAULT = 1;
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_ERROR_HANDLER_CONF = "camel.component.spring-rabbitmq.errorHandler";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_ERROR_HANDLER_DOC = "To use a custom ErrorHandler for handling exceptions from the message listener (consumer)";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_ERROR_HANDLER_DEFAULT = null;
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_LISTENER_CONTAINER_FACTORY_CONF = "camel.component.spring-rabbitmq.listenerContainerFactory";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_LISTENER_CONTAINER_FACTORY_DOC = "To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_LISTENER_CONTAINER_FACTORY_DEFAULT = null;
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MAX_CONCURRENT_CONSUMERS_CONF = "camel.component.spring-rabbitmq.maxConcurrentConsumers";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MAX_CONCURRENT_CONSUMERS_DOC = "The maximum number of consumers (available only with SMLC)";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MAX_CONCURRENT_CONSUMERS_DEFAULT = null;
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MESSAGE_LISTENER_CONTAINER_TYPE_CONF = "camel.component.spring-rabbitmq.messageListenerContainerType";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MESSAGE_LISTENER_CONTAINER_TYPE_DOC = "The type of the MessageListenerContainer One of: [DMLC] [SMLC]";
+    public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MESSAGE_LISTENER_CONTAINER_TYPE_DEFAULT = "DMLC";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_PREFETCH_COUNT_CONF = "camel.component.spring-rabbitmq.prefetchCount";
     public static final String CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_PREFETCH_COUNT_DOC = "Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.";
     public static final Integer CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_PREFETCH_COUNT_DEFAULT = 250;
@@ -190,8 +208,11 @@ public class CamelSpringrabbitmqSourceConnectorConfig
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCLUSIVE_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCLUSIVE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCLUSIVE_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_NO_LOCAL_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_NO_LOCAL_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_NO_LOCAL_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_QUEUES_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_QUEUES_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_QUEUES_DOC);
+        conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_CONCURRENT_CONSUMERS_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_CONCURRENT_CONSUMERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_CONCURRENT_CONSUMERS_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCEPTION_HANDLER_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCEPTION_HANDLER_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCEPTION_HANDLER_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCHANGE_PATTERN_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCHANGE_PATTERN_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_EXCHANGE_PATTERN_DOC);
+        conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MAX_CONCURRENT_CONSUMERS_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MAX_CONCURRENT_CONSUMERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MAX_CONCURRENT_CONSUMERS_DOC);
+        conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_LISTENER_CONTAINER_TYPE_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_LISTENER_CONTAINER_TYPE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_LISTENER_CONTAINER_TYPE_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_PREFETCH_COUNT_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_PREFETCH_COUNT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_PREFETCH_COUNT_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_ARGS_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_ARGS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_ARGS_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_CONVERTER_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_CONVERTER_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_ENDPOINT_MESSAGE_CONVERTER_DOC);
@@ -207,8 +228,11 @@ public class CamelSpringrabbitmqSourceConnectorConfig
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_EXCHANGE_TYPE_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_EXCHANGE_TYPE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_EXCHANGE_TYPE_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_QUEUE_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_QUEUE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_QUEUE_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_ROUTING_KEY_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_ROUTING_KEY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_DEAD_LETTER_ROUTING_KEY_DOC);
+        conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_CONCURRENT_CONSUMERS_CONF, ConfigDef.Type.INT, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_CONCURRENT_CONSUMERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_CONCURRENT_CONSUMERS_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_ERROR_HANDLER_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_ERROR_HANDLER_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_ERROR_HANDLER_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_LISTENER_CONTAINER_FACTORY_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_LISTENER_CONTAINER_FACTORY_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_LISTENER_CONTAINER_FACTORY_DOC);
+        conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MAX_CONCURRENT_CONSUMERS_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MAX_CONCURRENT_CONSUMERS_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MAX_CONCURRENT_CONSUMERS_DOC);
+        conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MESSAGE_LISTENER_CONTAINER_TYPE_CONF, ConfigDef.Type.STRING, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MESSAGE_LISTENER_CONTAINER_TYPE_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_MESSAGE_LISTENER_CONTAINER_TYPE_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_PREFETCH_COUNT_CONF, ConfigDef.Type.INT, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_PREFETCH_COUNT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_PREFETCH_COUNT_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_SHUTDOWN_TIMEOUT_CONF, ConfigDef.Type.LONG, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_SHUTDOWN_TIMEOUT_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_SHUTDOWN_TIMEOUT_DOC);
         conf.define(CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_AUTOWIRED_ENABLED_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_AUTOWIRED_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SOURCE_SPRINGRABBITMQ_COMPONENT_AUTOWIRED_ENABLED_DOC);
diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc
index f8d3083..dbaeb55 100644
--- a/docs/modules/ROOT/nav.adoc
+++ b/docs/modules/ROOT/nav.adoc
@@ -70,6 +70,50 @@
 ** camel-avro-kafka-connector
 *** xref:connectors/camel-avro-kafka-source-connector.adoc[Source Docs]
 *** xref:connectors/camel-avro-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-cw-kafka-connector
+*** xref:connectors/camel-aws-cw-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ddb-kafka-connector
+*** xref:connectors/camel-aws-ddb-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ddbstream-kafka-connector
+*** xref:connectors/camel-aws-ddbstream-kafka-source-connector.adoc[Source Docs]
+** camel-aws-ec2-kafka-connector
+*** xref:connectors/camel-aws-ec2-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ecs-kafka-connector
+*** xref:connectors/camel-aws-ecs-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-eks-kafka-connector
+*** xref:connectors/camel-aws-eks-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-iam-kafka-connector
+*** xref:connectors/camel-aws-iam-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-kinesis-firehose-kafka-connector
+*** xref:connectors/camel-aws-kinesis-firehose-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-kinesis-kafka-connector
+*** xref:connectors/camel-aws-kinesis-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-kinesis-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-kms-kafka-connector
+*** xref:connectors/camel-aws-kms-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-lambda-kafka-connector
+*** xref:connectors/camel-aws-lambda-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-mq-kafka-connector
+*** xref:connectors/camel-aws-mq-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-msk-kafka-connector
+*** xref:connectors/camel-aws-msk-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-s3-kafka-connector
+*** xref:connectors/camel-aws-s3-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-s3-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-sdb-kafka-connector
+*** xref:connectors/camel-aws-sdb-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ses-kafka-connector
+*** xref:connectors/camel-aws-ses-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-sns-kafka-connector
+*** xref:connectors/camel-aws-sns-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-sqs-kafka-connector
+*** xref:connectors/camel-aws-sqs-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-sqs-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-swf-kafka-connector
+*** xref:connectors/camel-aws-swf-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-swf-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-translate-kafka-connector
+*** xref:connectors/camel-aws-translate-kafka-sink-connector.adoc[Sink Docs]
 ** camel-aws2-athena-kafka-connector
 *** xref:connectors/camel-aws2-athena-kafka-sink-connector.adoc[Sink Docs]
 ** camel-aws2-cw-kafka-connector
@@ -115,9 +159,15 @@
 *** xref:connectors/camel-aws2-sts-kafka-sink-connector.adoc[Sink Docs]
 ** camel-aws2-translate-kafka-connector
 *** xref:connectors/camel-aws2-translate-kafka-sink-connector.adoc[Sink Docs]
+** camel-azure-blob-kafka-connector
+*** xref:connectors/camel-azure-blob-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-azure-blob-kafka-sink-connector.adoc[Sink Docs]
 ** camel-azure-eventhubs-kafka-connector
 *** xref:connectors/camel-azure-eventhubs-kafka-source-connector.adoc[Source Docs]
 *** xref:connectors/camel-azure-eventhubs-kafka-sink-connector.adoc[Sink Docs]
+** camel-azure-queue-kafka-connector
+*** xref:connectors/camel-azure-queue-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-azure-queue-kafka-sink-connector.adoc[Sink Docs]
 ** camel-azure-storage-blob-kafka-connector
 *** xref:connectors/camel-azure-storage-blob-kafka-source-connector.adoc[Source Docs]
 *** xref:connectors/camel-azure-storage-blob-kafka-sink-connector.adoc[Sink Docs]
diff --git a/docs/modules/ROOT/pages/connectors.adoc b/docs/modules/ROOT/pages/connectors.adoc
index 4e7893c..2e11eaf 100644
--- a/docs/modules/ROOT/pages/connectors.adoc
+++ b/docs/modules/ROOT/pages/connectors.adoc
@@ -2,7 +2,7 @@
 = Supported connectors and documentation
 
 // kafka-connectors list: START
-Number of Camel Kafka connectors: 335 
+Number of Camel Kafka connectors: 357 
 
 [width="100%",cols="4,1,1,1,1,1",options="header"]
 |===
@@ -27,6 +27,26 @@ Number of Camel Kafka connectors: 335
 | *camel-atomix-set-kafka-connector* | true | true | xref:connectors/camel-atomix-set-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-atomix-set-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-atomix-set-kafka-connector/0.7.2/camel-atomix-set-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-atomix-value-kafka-connector* | true | true | xref:connectors/camel-atomix-value-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-atomix-value-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-atomix-value-kafka-connector/0.7.2/camel-atomix-value-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-avro-kafka-connector* | true | true | xref:connectors/camel-avro-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-avro-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-avro-kafka-connector/0.7.2/camel-avro-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-cw-kafka-connector* | true | false | xref:connectors/camel-aws-cw-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-cw-kafka-connector/0.7.2/camel-aws-cw-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-ddb-kafka-connector* | true | false | xref:connectors/camel-aws-ddb-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ddb-kafka-connector/0.7.2/camel-aws-ddb-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-ddbstream-kafka-connector* | false | true |  | xref:connectors/camel-aws-ddbstream-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ddbstream-kafka-connector/0.7.2/camel-aws-ddbstream-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-ec2-kafka-connector* | true | false | xref:connectors/camel-aws-ec2-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ec2-kafka-connector/0.7.2/camel-aws-ec2-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-ecs-kafka-connector* | true | false | xref:connectors/camel-aws-ecs-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ecs-kafka-connector/0.7.2/camel-aws-ecs-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-eks-kafka-connector* | true | false | xref:connectors/camel-aws-eks-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-eks-kafka-connector/0.7.2/camel-aws-eks-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-iam-kafka-connector* | true | false | xref:connectors/camel-aws-iam-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-iam-kafka-connector/0.7.2/camel-aws-iam-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-kinesis-firehose-kafka-connector* | true | false | xref:connectors/camel-aws-kinesis-firehose-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-kinesis-firehose-kafka-connector/0.7.2/camel-aws-kinesis-firehose-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-kinesis-kafka-connector* | true | true | xref:connectors/camel-aws-kinesis-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-aws-kinesis-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-kinesis-kafka-connector/0.7.2/camel-aws-kinesis-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-kms-kafka-connector* | true | false | xref:connectors/camel-aws-kms-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-kms-kafka-connector/0.7.2/camel-aws-kms-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-lambda-kafka-connector* | true | false | xref:connectors/camel-aws-lambda-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-lambda-kafka-connector/0.7.2/camel-aws-lambda-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-mq-kafka-connector* | true | false | xref:connectors/camel-aws-mq-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-mq-kafka-connector/0.7.2/camel-aws-mq-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-msk-kafka-connector* | true | false | xref:connectors/camel-aws-msk-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-msk-kafka-connector/0.7.2/camel-aws-msk-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-s3-kafka-connector* | true | true | xref:connectors/camel-aws-s3-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-aws-s3-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.7.2/camel-aws-s3-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-sdb-kafka-connector* | true | false | xref:connectors/camel-aws-sdb-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-sdb-kafka-connector/0.7.2/camel-aws-sdb-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-ses-kafka-connector* | true | false | xref:connectors/camel-aws-ses-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ses-kafka-connector/0.7.2/camel-aws-ses-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-sns-kafka-connector* | true | false | xref:connectors/camel-aws-sns-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-sns-kafka-connector/0.7.2/camel-aws-sns-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-sqs-kafka-connector* | true | true | xref:connectors/camel-aws-sqs-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-aws-sqs-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-sqs-kafka-connector/0.7.2/camel-aws-sqs-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-swf-kafka-connector* | true | true | xref:connectors/camel-aws-swf-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-aws-swf-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-swf-kafka-connector/0.7.2/camel-aws-swf-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-aws-translate-kafka-connector* | true | false | xref:connectors/camel-aws-translate-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-translate-kafka-connector/0.7.2/camel-aws-translate-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-aws2-athena-kafka-connector* | true | false | xref:connectors/camel-aws2-athena-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-athena-kafka-connector/0.7.2/camel-aws2-athena-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-aws2-cw-kafka-connector* | true | false | xref:connectors/camel-aws2-cw-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-cw-kafka-connector/0.7.2/camel-aws2-cw-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-aws2-ddb-kafka-connector* | true | false | xref:connectors/camel-aws2-ddb-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-ddb-kafka-connector/0.7.2/camel-aws2-ddb-kafka-connector-0.7.2-package.tar.gz[Download]
@@ -48,7 +68,9 @@ Number of Camel Kafka connectors: 335
 | *camel-aws2-sqs-kafka-connector* | true | true | xref:connectors/camel-aws2-sqs-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-aws2-sqs-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-sqs-kafka-connector/0.7.2/camel-aws2-sqs-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-aws2-sts-kafka-connector* | true | false | xref:connectors/camel-aws2-sts-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-sts-kafka-connector/0.7.2/camel-aws2-sts-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-aws2-translate-kafka-connector* | true | false | xref:connectors/camel-aws2-translate-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-translate-kafka-connector/0.7.2/camel-aws2-translate-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-azure-blob-kafka-connector* | true | true | xref:connectors/camel-azure-blob-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-azure-blob-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-azure-blob-kafka-connector/0.7.2/camel-azure-blob-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-azure-eventhubs-kafka-connector* | true | true | xref:connectors/camel-azure-eventhubs-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-azure-eventhubs-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-azure-eventhubs-kafka-connector/0.7.2/camel-azure-eventhubs-kafka-connector-0.7.2-package.tar.gz[Download]
+| *camel-azure-queue-kafka-connector* | true | true | xref:connectors/camel-azure-queue-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-azure-queue-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-azure-queue-kafka-connector/0.7.2/camel-azure-queue-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-azure-storage-blob-kafka-connector* | true | true | xref:connectors/camel-azure-storage-blob-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-azure-storage-blob-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-azure-storage-blob-kafka-connector/0.7.2/camel-azure-storage-blob-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-azure-storage-datalake-kafka-connector* | true | true | xref:connectors/camel-azure-storage-datalake-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-azure-storage-datalake-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-azure-storage-datalake-kafka-connector/0.7.2/camel-azure-storage-datalake-kafka-connector-0.7.2-package.tar.gz[Download]
 | *camel-azure-storage-queue-kafka-connector* | true | true | xref:connectors/camel-azure-storage-queue-kafka-sink-connector.adoc[Sink Docs] | xref:connectors/camel-azure-storage-queue-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-azure-storage-queue-kafka-connector/0.7.2/camel-azure-storage-queue-kafka-connector-0.7.2-package.tar.gz[Download]
diff --git a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc
index 597aa87..1a99602 100644
--- a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-s3 sink connector supports 59 options, which are listed below.
 | *camel.sink.path.bucketNameOrArn* | Bucket name or ARN | null | true | HIGH
 | *camel.sink.endpoint.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.sink.endpoint.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.sink.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.sink.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.sink.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.sink.endpoint.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
 | *camel.sink.endpoint.policy* | The policy for this queue to set in the com.amazonaws.services.s3.AmazonS3#setBucketPolicy() method. | null | false | MEDIUM
@@ -62,7 +62,7 @@ The camel-aws2-s3 sink connector supports 59 options, which are listed below.
 | *camel.sink.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.component.aws2-s3.configuration* | The component configuration | null | false | MEDIUM
 | *camel.component.aws2-s3.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-s3.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc
index 909c149..81fb275 100644
--- a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-s3 source connector supports 85 options, which are listed below.
 | *camel.source.path.bucketNameOrArn* | Bucket name or ARN | null | true | HIGH
 | *camel.source.endpoint.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.source.endpoint.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.source.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.source.endpoint.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.source.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.source.endpoint.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
 | *camel.source.endpoint.policy* | The policy for this queue to set in the com.amazonaws.services.s3.AmazonS3#setBucketPolicy() method. | null | false | MEDIUM
@@ -85,7 +85,7 @@ The camel-aws2-s3 source connector supports 85 options, which are listed below.
 | *camel.source.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Client* | Reference to a com.amazonaws.services.s3.AmazonS3 in the registry. | null | false | MEDIUM
 | *camel.component.aws2-s3.amazonS3Presigner* | An S3 Presigner for Request, used mainly in createDownloadLink operation | null | false | MEDIUM
-| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | true | false | MEDIUM
+| *camel.component.aws2-s3.autoCreateBucket* | Setting the autocreation of the S3 bucket bucketName. This will apply also in case of moveAfterRead option enabled and it will create the destinationBucket if it doesn't exist already. | false | false | MEDIUM
 | *camel.component.aws2-s3.configuration* | The component configuration | null | false | MEDIUM
 | *camel.component.aws2-s3.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-s3.pojoRequest* | If we want to use a POJO request as body or not | false | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-aws2-sns-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws2-sns-kafka-sink-connector.adoc
index df40bba..f36dc00 100644
--- a/docs/modules/ROOT/pages/connectors/camel-aws2-sns-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-aws2-sns-kafka-sink-connector.adoc
@@ -33,7 +33,7 @@ The camel-aws2-sns sink connector supports 48 options, which are listed below.
 | Name | Description | Default | Required | Priority
 | *camel.sink.path.topicNameOrArn* | Topic name or ARN | null | true | HIGH
 | *camel.sink.endpoint.amazonSNSClient* | To use the AmazonSNS as the client | null | false | MEDIUM
-| *camel.sink.endpoint.autoCreateTopic* | Setting the autocreation of the topic | true | false | MEDIUM
+| *camel.sink.endpoint.autoCreateTopic* | Setting the autocreation of the topic | false | false | MEDIUM
 | *camel.sink.endpoint.headerFilterStrategy* | To use a custom HeaderFilterStrategy to map headers to/from Camel. | null | false | MEDIUM
 | *camel.sink.endpoint.kmsMasterKeyId* | The ID of an AWS-managed customer master key (CMK) for Amazon SNS or a custom CMK. | null | false | MEDIUM
 | *camel.sink.endpoint.lazyStartProducer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then cre [...]
@@ -56,7 +56,7 @@ The camel-aws2-sns sink connector supports 48 options, which are listed below.
 | *camel.sink.endpoint.accessKey* | Amazon AWS Access Key | null | false | MEDIUM
 | *camel.sink.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-sns.amazonSNSClient* | To use the AmazonSNS as the client | null | false | MEDIUM
-| *camel.component.aws2-sns.autoCreateTopic* | Setting the autocreation of the topic | true | false | MEDIUM
+| *camel.component.aws2-sns.autoCreateTopic* | Setting the autocreation of the topic | false | false | MEDIUM
 | *camel.component.aws2-sns.configuration* | Component configuration | null | false | MEDIUM
 | *camel.component.aws2-sns.kmsMasterKeyId* | The ID of an AWS-managed customer master key (CMK) for Amazon SNS or a custom CMK. | null | false | MEDIUM
 | *camel.component.aws2-sns.lazyStartProducer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed the [...]
diff --git a/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-sink-connector.adoc
index 9f58eef..80d7a6f 100644
--- a/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-sink-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-sqs sink connector supports 60 options, which are listed below.
 | *camel.sink.path.queueNameOrArn* | Queue name or ARN | null | true | HIGH
 | *camel.sink.endpoint.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.sink.endpoint.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.sink.endpoint.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.sink.endpoint.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.sink.endpoint.headerFilterStrategy* | To use a custom HeaderFilterStrategy to map headers to/from Camel. | null | false | MEDIUM
 | *camel.sink.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.sink.endpoint.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
@@ -63,7 +63,7 @@ The camel-aws2-sqs sink connector supports 60 options, which are listed below.
 | *camel.sink.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-sqs.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.component.aws2-sqs.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.component.aws2-sqs.configuration* | The AWS SQS default configuration | null | false | MEDIUM
 | *camel.component.aws2-sqs.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-sqs.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-source-connector.adoc
index ba1b605..af08ebd 100644
--- a/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-aws2-sqs-kafka-source-connector.adoc
@@ -34,7 +34,7 @@ The camel-aws2-sqs source connector supports 93 options, which are listed below.
 | *camel.source.path.queueNameOrArn* | Queue name or ARN | null | true | HIGH
 | *camel.source.endpoint.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.source.endpoint.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.source.endpoint.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.source.endpoint.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.source.endpoint.headerFilterStrategy* | To use a custom HeaderFilterStrategy to map headers to/from Camel. | null | false | MEDIUM
 | *camel.source.endpoint.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.source.endpoint.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
@@ -89,7 +89,7 @@ The camel-aws2-sqs source connector supports 93 options, which are listed below.
 | *camel.source.endpoint.secretKey* | Amazon AWS Secret Key | null | false | MEDIUM
 | *camel.component.aws2-sqs.amazonAWSHost* | The hostname of the Amazon AWS cloud. | "amazonaws.com" | false | MEDIUM
 | *camel.component.aws2-sqs.amazonSQSClient* | To use the AmazonSQS as client | null | false | MEDIUM
-| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | true | false | MEDIUM
+| *camel.component.aws2-sqs.autoCreateQueue* | Setting the autocreation of the queue | false | false | MEDIUM
 | *camel.component.aws2-sqs.configuration* | The AWS SQS default configuration | null | false | MEDIUM
 | *camel.component.aws2-sqs.overrideEndpoint* | Set the need for overidding the endpoint. This option needs to be used in combination with uriEndpointOverride option | false | false | MEDIUM
 | *camel.component.aws2-sqs.protocol* | The underlying protocol used to communicate with SQS | "https" | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-sink-connector.adoc
index b00f735..872422c 100644
--- a/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSinkConn
 ----
 
 
-The camel-netty-http sink connector supports 111 options, which are listed below.
+The camel-netty-http sink connector supports 113 options, which are listed below.
 
 
 
@@ -56,6 +56,7 @@ The camel-netty-http sink connector supports 111 options, which are listed below
 | *camel.sink.endpoint.producerPoolMinEvictableIdle* | Sets the minimum amount of time (value in millis) an object may sit idle in the pool before it is eligible for eviction by the idle object evictor. | 300000L | false | MEDIUM
 | *camel.sink.endpoint.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.sink.endpoint.useRelativePath* | Sets whether to use a relative path in HTTP requests. | true | false | MEDIUM
+| *camel.sink.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.sink.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.sink.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.sink.endpoint.configuration* | To use a custom configured NettyHttpConfiguration for configuring this endpoint. | null | false | MEDIUM
@@ -106,6 +107,7 @@ The camel-netty-http sink connector supports 111 options, which are listed below
 | *camel.component.netty-http.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.component.netty-http.udpConnectionless Sending* | This option supports connection less udp sending which is a real fire and forget. A connected udp send receive the PortUnreachableException if no one is listen on the receiving port. | false | false | MEDIUM
 | *camel.component.netty-http.useByteBuf* | If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out. | false | false | MEDIUM
+| *camel.component.netty-http.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty-http.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty-http.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty-http.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-source-connector.adoc
index 9d9a9b3..5ea81c5 100644
--- a/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-netty-http-kafka-source-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceCo
 ----
 
 
-The camel-netty-http source connector supports 131 options, which are listed below.
+The camel-netty-http source connector supports 133 options, which are listed below.
 
 
 
@@ -67,6 +67,7 @@ The camel-netty-http source connector supports 131 options, which are listed bel
 | *camel.source.endpoint.traceEnabled* | Specifies whether to enable HTTP TRACE for this Netty HTTP consumer. By default TRACE is turned off. | false | false | MEDIUM
 | *camel.source.endpoint.urlDecodeHeaders* | If this option is enabled, then during binding from Netty to Camel Message then the header values will be URL decoded (eg %20 will be a space character. Notice this option is used by the default org.apache.camel.component.netty.http.NettyHttpBinding and therefore if you implement a custom org.apache.camel.component.netty.http.NettyHttpBinding then you would need to decode the headers accordingly to this option. | false | false | MEDIUM
 | *camel.source.endpoint.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.source.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.source.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.source.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.source.endpoint.configuration* | To use a custom configured NettyHttpConfiguration for configuring this endpoint. | null | false | MEDIUM
@@ -125,6 +126,7 @@ The camel-netty-http source connector supports 131 options, which are listed bel
 | *camel.component.netty-http.serverExceptionCaught LogLevel* | If the server (NettyConsumer) catches an exception then its logged using this logging level. One of: [TRACE] [DEBUG] [INFO] [WARN] [ERROR] [OFF] | "WARN" | false | MEDIUM
 | *camel.component.netty-http.serverInitializer Factory* | To use a custom ServerInitializerFactory | null | false | MEDIUM
 | *camel.component.netty-http.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.component.netty-http.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty-http.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty-http.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty-http.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-netty-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-netty-kafka-sink-connector.adoc
index 39d68f8..3cd0dc3 100644
--- a/docs/modules/ROOT/pages/connectors/camel-netty-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-netty-kafka-sink-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.netty.CamelNettySinkConnector
 ----
 
 
-The camel-netty sink connector supports 107 options, which are listed below.
+The camel-netty sink connector supports 109 options, which are listed below.
 
 
 
@@ -53,6 +53,7 @@ The camel-netty sink connector supports 107 options, which are listed below.
 | *camel.sink.endpoint.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.sink.endpoint.udpConnectionlessSending* | This option supports connection less udp sending which is a real fire and forget. A connected udp send receive the PortUnreachableException if no one is listen on the receiving port. | false | false | MEDIUM
 | *camel.sink.endpoint.useByteBuf* | If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out. | false | false | MEDIUM
+| *camel.sink.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.sink.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.sink.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.sink.endpoint.nativeTransport* | Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms. You need to add the netty JAR for the host operating system you are using. See more details at: \http://netty.io/wiki/native-transports.html | false | false | MEDIUM
@@ -105,6 +106,7 @@ The camel-netty sink connector supports 107 options, which are listed below.
 | *camel.component.netty.producerPoolMinIdle* | Sets the minimum number of instances allowed in the producer pool before the evictor thread (if active) spawns new objects. | null | false | MEDIUM
 | *camel.component.netty.udpConnectionlessSending* | This option supports connection less udp sending which is a real fire and forget. A connected udp send receive the PortUnreachableException if no one is listen on the receiving port. | false | false | MEDIUM
 | *camel.component.netty.useByteBuf* | If the useByteBuf is true, netty producer will turn the message body into ByteBuf before sending it out. | false | false | MEDIUM
+| *camel.component.netty.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-netty-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-netty-kafka-source-connector.adoc
index ad91475..07a8199 100644
--- a/docs/modules/ROOT/pages/connectors/camel-netty-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-netty-kafka-source-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.netty.CamelNettySourceConnector
 ----
 
 
-The camel-netty source connector supports 119 options, which are listed below.
+The camel-netty source connector supports 121 options, which are listed below.
 
 
 
@@ -58,6 +58,7 @@ The camel-netty source connector supports 119 options, which are listed below.
 | *camel.source.endpoint.serverExceptionCaughtLog Level* | If the server (NettyConsumer) catches an exception then its logged using this logging level. One of: [TRACE] [DEBUG] [INFO] [WARN] [ERROR] [OFF] | "WARN" | false | MEDIUM
 | *camel.source.endpoint.serverInitializerFactory* | To use a custom ServerInitializerFactory | null | false | MEDIUM
 | *camel.source.endpoint.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.source.endpoint.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.source.endpoint.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.source.endpoint.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
 | *camel.source.endpoint.nativeTransport* | Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms. You need to add the netty JAR for the host operating system you are using. See more details at: \http://netty.io/wiki/native-transports.html | false | false | MEDIUM
@@ -116,6 +117,7 @@ The camel-netty source connector supports 119 options, which are listed below.
 | *camel.component.netty.serverExceptionCaughtLog Level* | If the server (NettyConsumer) catches an exception then its logged using this logging level. One of: [TRACE] [DEBUG] [INFO] [WARN] [ERROR] [OFF] | "WARN" | false | MEDIUM
 | *camel.component.netty.serverInitializerFactory* | To use a custom ServerInitializerFactory | null | false | MEDIUM
 | *camel.component.netty.usingExecutorService* | Whether to use ordered thread pool, to ensure events are processed orderly on the same channel. | true | false | MEDIUM
+| *camel.component.netty.hostnameVerification* | To enable/disable hostname verification on SSLEngine | false | false | MEDIUM
 | *camel.component.netty.allowSerializedHeaders* | Only used for TCP when transferExchange is true. When set to true, serializable objects in headers and properties will be added to the exchange. Otherwise Camel will exclude any non-serializable objects and log it at WARN level. | false | false | MEDIUM
 | *camel.component.netty.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM
 | *camel.component.netty.channelGroup* | To use a explicit ChannelGroup. | null | false | MEDIUM
diff --git a/docs/modules/ROOT/pages/connectors/camel-spring-rabbitmq-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-spring-rabbitmq-kafka-source-connector.adoc
index c3dcc6a..467a171 100644
--- a/docs/modules/ROOT/pages/connectors/camel-spring-rabbitmq-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-spring-rabbitmq-kafka-source-connector.adoc
@@ -24,7 +24,7 @@ connector.class=org.apache.camel.kafkaconnector.springrabbitmq.CamelSpringrabbit
 ----
 
 
-The camel-spring-rabbitmq source connector supports 44 options, which are listed below.
+The camel-spring-rabbitmq source connector supports 50 options, which are listed below.
 
 
 
@@ -49,8 +49,11 @@ The camel-spring-rabbitmq source connector supports 44 options, which are listed
 | *camel.source.endpoint.exclusive* | Set to true for an exclusive consumer | false | false | MEDIUM
 | *camel.source.endpoint.noLocal* | Set to true for an no-local consumer | false | false | MEDIUM
 | *camel.source.endpoint.queues* | The queue(s) to use for consuming messages. Multiple queue names can be separated by comma. If none has been configured then Camel will generate an unique id as the queue name for the consumer. | null | false | MEDIUM
+| *camel.source.endpoint.concurrentConsumers* | The number of consumers | null | false | MEDIUM
 | *camel.source.endpoint.exceptionHandler* | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | null | false | MEDIUM
 | *camel.source.endpoint.exchangePattern* | Sets the exchange pattern when the consumer creates an exchange. One of: [InOnly] [InOut] [InOptionalOut] | null | false | MEDIUM
+| *camel.source.endpoint.maxConcurrentConsumers* | The maximum number of consumers (available only with SMLC) | null | false | MEDIUM
+| *camel.source.endpoint.messageListenerContainerType* | The type of the MessageListenerContainer One of: [DMLC] [SMLC] | "DMLC" | false | MEDIUM
 | *camel.source.endpoint.prefetchCount* | Tell the broker how many messages to send in a single request. Often this can be set quite high to improve throughput. | null | false | MEDIUM
 | *camel.source.endpoint.args* | Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.consumer. arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dlq.queue. arg.dlq.binding. For example to declare a queue with message ttl argument: args=arg.queue.x-message-ttl=60000 | null | false | MEDIUM
 | *camel.source.endpoint.messageConverter* | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | null | false | MEDIUM
@@ -66,8 +69,11 @@ The camel-spring-rabbitmq source connector supports 44 options, which are listed
 | *camel.component.spring-rabbitmq.deadLetterExchange Type* | The type of the dead letter exchange One of: [direct] [fanout] [headers] [topic] | "direct" | false | MEDIUM
 | *camel.component.spring-rabbitmq.deadLetterQueue* | The name of the dead letter queue | null | false | MEDIUM
 | *camel.component.spring-rabbitmq.deadLetterRouting Key* | The routing key for the dead letter exchange | null | false | MEDIUM
+| *camel.component.spring-rabbitmq.concurrent Consumers* | The number of consumers | 1 | false | MEDIUM
 | *camel.component.spring-rabbitmq.errorHandler* | To use a custom ErrorHandler for handling exceptions from the message listener (consumer) | null | false | MEDIUM
 | *camel.component.spring-rabbitmq.listenerContainer Factory* | To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages | null | false | MEDIUM
+| *camel.component.spring-rabbitmq.maxConcurrent Consumers* | The maximum number of consumers (available only with SMLC) | null | false | MEDIUM
+| *camel.component.spring-rabbitmq.messageListener ContainerType* | The type of the MessageListenerContainer One of: [DMLC] [SMLC] | "DMLC" | false | MEDIUM
 | *camel.component.spring-rabbitmq.prefetchCount* | Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. | 250 | false | MEDIUM
 | *camel.component.spring-rabbitmq.shutdownTimeout* | The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. | 5000L | false | MEDIUM
 | *camel.component.spring-rabbitmq.autowiredEnabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | false | MEDIUM


[camel-kafka-connector] 08/09: Adjusted catalog tests for camel 3.9.0

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e80640135a01f3e79afea0a707c61940849fef77
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Wed Mar 10 16:15:01 2021 +0100

    Adjusted catalog tests for camel 3.9.0
---
 .../camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java b/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
index 15a34ea..114e311 100644
--- a/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
+++ b/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
@@ -77,7 +77,7 @@ class CamelKafkaConnectorCatalogTest {
         assertEquals("camel.sink.path.topicNameOrArn", model.getOptions().get(0).getName());
         assertEquals("camel.sink.endpoint.amazonSNSClient", model.getOptions().get(1).getName());
         assertEquals("camel.sink.endpoint.autoCreateTopic", model.getOptions().get(2).getName());
-        assertEquals("true", model.getOptions().get(2).getDefaultValue());
+        assertEquals("false", model.getOptions().get(2).getDefaultValue());
         assertNull(model.getOptions().get(1).getDefaultValue());
         assertNull(model.getConverters());
         assertNull(model.getTransforms());


[camel-kafka-connector] 01/09: fixed #980 : camel.source.contentLogLevel config not honored in source connectors

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f3157198f17608359bea0b1a3576667bcae96bf5
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Thu Feb 4 23:12:27 2021 +0100

    fixed #980 : camel.source.contentLogLevel config not honored in source connectors
---
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java    | 14 +++++++++++---
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java  | 15 +++++++++++----
 .../apache/camel/kafkaconnector/CamelSinkTaskTest.java    | 13 +++++++++++++
 .../apache/camel/kafkaconnector/CamelSourceTaskTest.java  | 14 ++++++++++++++
 4 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 50a44da..bc6a76c 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -54,10 +54,10 @@ public class CamelSinkTask extends SinkTask {
     private static final String LOCAL_URL = "direct:start";
     private ErrantRecordReporter reporter;
 
-
     private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
     private Endpoint localEndpoint;
+
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
     private boolean mapProperties;
     private boolean mapHeaders;
@@ -78,11 +78,11 @@ public class CamelSinkTask extends SinkTask {
                 reporter = context.errantRecordReporter();
             }
 
+            String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
                 loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
+                LOG.debug("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             }
 
             String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
@@ -234,4 +234,12 @@ public class CamelSinkTask extends SinkTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 4c138af..16e6bfc 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -73,11 +73,11 @@ public class CamelSourceTask extends SourceTask {
             Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
             CamelSourceConnectorConfig config = getCamelSourceConnectorConfig(actualProps);
 
+            String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
-                loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase());
+                loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
+                LOG.error("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             }
 
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
@@ -153,7 +153,6 @@ public class CamelSourceTask extends SourceTask {
         return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
     }
 
-
     @Override
     public synchronized List<SourceRecord> poll() {
         final long startPollEpochMilli = Instant.now().toEpochMilli();
@@ -313,4 +312,12 @@ public class CamelSourceTask extends SourceTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 5aaca7f..bab0a5d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -1111,4 +1111,17 @@ public class CamelSinkTaskTest {
         }
     }
 
+    @Test
+    public void testContentLogLevelConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+        assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+
+        sinkTask.stop();
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b2a7c4e..21d56fc 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -607,4 +607,18 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+
+    @Test
+    public void testContentLogLevelConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, "INFO");
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+        assertEquals(LoggingLevel.INFO, sourceTask.getLoggingLevel());
+
+        sourceTask.stop();
+    }
 }


[camel-kafka-connector] 04/09: fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 290f065ea34dfd5a97679b58e2acff132b1feceb
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:17:03 2021 +0100

    fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.
---
 tests/itests-netty-http/pom.xml                                  | 9 +++++++++
 .../{surce => source}/CamelNettyHTTPPropertyFactory.java         | 3 +--
 .../nettyhttp/{surce => source}/CamelSourceNettyHTTPITCase.java  | 4 ++--
 tests/pom.xml                                                    | 1 +
 4 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
index ddb8bf7..88a0929 100644
--- a/tests/itests-netty-http/pom.xml
+++ b/tests/itests-netty-http/pom.xml
@@ -37,6 +37,15 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- test infra -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-common</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-netty-http</artifactId>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
similarity index 97%
rename from tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
rename to tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
index 1562328..e4df820 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.camel.kafkaconnector.nettyhttp.surce;
+package org.apache.camel.kafkaconnector.nettyhttp.source;
 
 import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
 
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
similarity index 98%
rename from tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
rename to tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index e7e6468..41cb6e1 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.camel.kafkaconnector.nettyhttp.surce;
+package org.apache.camel.kafkaconnector.nettyhttp.source;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
@@ -115,6 +114,7 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
         }
     }
 
+    @Override
     protected void verifyMessages(TestMessageConsumer<?> consumer) {
         int received = consumer.consumedMessages().size();
         assertEquals(expect, received, "Didn't process the expected amount of messages");
diff --git a/tests/pom.xml b/tests/pom.xml
index 4009424..abae69b 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -53,6 +53,7 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
+        <module>itests-netty-http</module>
         <module>itests-jdbc</module>
         <module>itests-azure-storage-blob</module>
         <module>itests-azure-storage-queue</module>


[camel-kafka-connector] 02/09: Fixed flaky hdfs itest.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit dff4d98b47cfc4b75038bb2c6422616fe39cacc9
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:12:27 2021 +0100

    Fixed flaky hdfs itest.
---
 .../hdfs/sink/CamelSinkHDFSITCase.java             |  28 +++--
 .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java  |  26 ++++-
 tests/itests-netty-http/pom.xml                    |  61 ++++++++++
 .../surce/CamelNettyHTTPPropertyFactory.java       |  61 ++++++++++
 .../surce/CamelSourceNettyHTTPITCase.java          | 123 +++++++++++++++++++++
 5 files changed, 287 insertions(+), 12 deletions(-)

diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index f12f310..55cf21f 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -17,11 +17,6 @@
 
 package org.apache.camel.kafkaconnector.hdfs.sink;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -37,9 +32,15 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.runners.model.InitializationError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -74,16 +75,23 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
     }
 
     @BeforeEach
-    public void setUp() throws IOException, URISyntaxException {
+    public void setUp() throws IOException, URISyntaxException, InitializationError {
         topicName = getTopicForTest(this);
         hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort());
 
         String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
         currentBasePath = new Path(currentPath);
 
-        if (!hdfsEasy.delete(currentBasePath)) {
-            // This is OK: directory may not exist on the path
-            LOG.debug("The directory at {} was not removed", currentBasePath.getName());
+        boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test")
+                                                                        &&  hdfsEasy.delete(new Path(currentBasePath, "initTest")));
+
+        if(hdfsServiceCorrectlyStarted) {
+            if (!hdfsEasy.delete(currentBasePath)) {
+                // This is OK: directory may not exist on the path
+                LOG.debug("The directory at {} was not removed", currentBasePath.getName());
+            }
+        } else {
+            throw new InitializationError("HDFS Service didn't start properly.");
         }
     }
 
@@ -136,7 +144,7 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
 
             LOG.debug("Retrieved file {} with contents: {}", f.getPath(), contents);
             boolean contains = contents.contains(matchString);
-            assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName());
+            assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName() + " content: [" + contents + "] should contain [" + matchString + "]");
         } catch (IOException e) {
             LOG.debug("Reading returned file {} failed: {}", f.getPath(), e.getMessage());
             fail("I/O error: " + e.getMessage());
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
index 7733fe8..4e95191 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
@@ -26,6 +26,7 @@ import java.util.Scanner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -110,8 +111,7 @@ public class HDFSEasy {
         try {
             return countFiles(path) >= minFiles;
         } catch (Exception e) {
-            LOG.warn("I/O exception while checking if file {} exists", path.getName());
-
+            LOG.warn("I/O exception: {} due to {} while checking if file {} exists", e.getMessage(), e.getCause(), path.getName());
             return false;
         }
     }
@@ -133,4 +133,26 @@ public class HDFSEasy {
             return false;
         }
     }
+
+    public boolean createFile(Path filePath, String content) {
+        FSDataOutputStream streamWriter = null;
+        try {
+            streamWriter = dfs.create(filePath);
+            streamWriter.writeBytes(content);
+            streamWriter.flush();
+        } catch (IOException e) {
+            LOG.debug("Error in file creation: " + e.getMessage());
+            return false;
+        } finally {
+            if (streamWriter != null) {
+                try {
+                    streamWriter.close();
+                } catch (IOException e) {
+                    LOG.debug("Error in file creation during stream close: " + e.getMessage());
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
new file mode 100644
index 0000000..ddb8bf7
--- /dev/null
+++ b/tests/itests-netty-http/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-netty-http</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Netty HTTP</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-netty-http</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
new file mode 100644
index 0000000..1562328
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.surce;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyHTTPPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyHTTPPropertyFactory> {
+    private CamelNettyHTTPPropertyFactory() {
+
+    }
+
+    public CamelNettyHTTPPropertyFactory withHost(String host) {
+        return setProperty("camel.source.path.host", host);
+    }
+
+    public CamelNettyHTTPPropertyFactory withProtocol(String protocol) {
+        return setProperty("camel.source.path.protocol", protocol);
+    }
+
+    public CamelNettyHTTPPropertyFactory withPort(int port) {
+        return setProperty("camel.source.path.port", String.valueOf(port));
+    }
+
+    public CamelNettyHTTPPropertyFactory withSync(boolean sync) {
+        return setProperty("camel.source.endpoint.sync", String.valueOf(sync));
+    }
+
+    public CamelNettyHTTPPropertyFactory withReceiveBufferSize(int size) {
+        return setProperty("camel.source.endpoint.receiveBufferSize", String.valueOf(size));
+    }
+
+    public CamelNettyHTTPPropertyFactory withCamelTypeConverterTransformTo(String targetClass) {
+        setProperty("transforms", "cameltypeconverter");
+        setProperty("transforms.cameltypeconverter.type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value");
+        return setProperty("transforms.cameltypeconverter.target.type", targetClass);
+    }
+
+    public static CamelNettyHTTPPropertyFactory basic() {
+        return new CamelNettyHTTPPropertyFactory()
+                .withTasksMax(1)
+                .withName("CamelNettyHttpSourceConnector")
+                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
new file mode 100644
index 0000000..e7e6468
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.surce;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final String TEST_MESSAGE = "testMessage";
+
+    private String topicName;
+
+    private final int expect = 1;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-http-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        topicName = getTopicForTest(this);
+    }
+
+    @AfterEach
+    public void tearDown() {}
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
+                .withKafkaTopic(topicName)
+                .withReceiveBufferSize(10)
+                .withHost("0.0.0.0")
+                .withPort(HTTP_PORT)
+                .withProtocol("http")
+                .withCamelTypeConverterTransformTo("java.lang.String");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Override
+    protected void produceTestData() {
+        int retriesLeft = 10;
+        boolean success = false;
+        while(retriesLeft > 0 && !success) {
+            try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
+
+                byte[] ipAddr = new byte[]{127, 0, 0, 1};
+                InetAddress localhost = InetAddress.getByAddress(ipAddr);
+                final HttpPost httpPost = new HttpPost("http://" + localhost.getHostAddress() + ":" + HTTP_PORT);
+
+                LOG.info("Executing request {} {}", httpPost.getMethod(), httpPost.getURI());
+
+                httpPost.setEntity(new StringEntity(TEST_MESSAGE));
+
+                CloseableHttpResponse response = httpclient.execute(httpPost);
+                assertEquals(200, response.getStatusLine().getStatusCode());
+                response.close();
+                httpPost.releaseConnection();
+                success = true;
+                LOG.info("Request success at {} attempt.", retriesLeft);
+            } catch (IOException e) {
+                if(retriesLeft == 1) {
+                    e.printStackTrace();
+                    fail("There should be no exceptions in sending the http test message.");
+                } else {
+                    retriesLeft--;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException interruptedException) {
+                        interruptedException.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+        assertEquals(expect, received, "Didn't process the expected amount of messages");
+        assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString());
+    }
+}