You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/02/03 13:08:55 UTC

[camel-kafka-connector] branch camel-master-master-align created (now 4bb4af5)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch camel-master-master-align
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 4bb4af5  SJMS2 itests: Avoid creating a new consumer for every message received

This branch includes the following new commits:

     new 046ca03  Added a basic test for idempotency
     new e689514  Avoid reusing the same destination name for the sjms2 idempotency tests
     new 6e57e05  Added idempotency test for SJMS2 using header expressions
     new 98fefda  Add connector entries to doc nav
     new 4bb4af5  SJMS2 itests: Avoid creating a new consumer for every message received

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 03/05: Added idempotency test for SJMS2 using header expressions

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master-master-align
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 6e57e058d712e9184687cb961e25500c7605780f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 09:11:34 2021 +0100

    Added idempotency test for SJMS2 using header expressions
---
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   | 80 +++++++++++++++++++---
 1 file changed, 69 insertions(+), 11 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 566e823..d2f06a7 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.kafkaconnector.sjms2.sink;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +30,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
+import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -53,6 +56,11 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
+    @FunctionalInterface
+    interface Producer {
+        void producerMessages();
+    }
+
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -84,7 +92,8 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
     public void setUp() {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
-        topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
         destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
     }
 
@@ -142,7 +151,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         }
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
@@ -151,14 +160,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         LOG.debug("Creating the consumer ...");
         service.submit(() -> consumeJMSMessages());
 
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            LOG.debug("Sending message 1/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-            LOG.debug("Sending message 2/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-        }
+        producer.producerMessages();
 
         LOG.debug("Waiting for the messages to be processed");
         service.shutdown();
@@ -170,6 +172,39 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         }
     }
 
+    private void produceMessagesNoProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                LOG.debug("Sending message 1/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+                LOG.debug("Sending message 2/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    private void produceMessagesWithProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                Map<String, String> headers = new HashMap<>();
+                int randomNumber = TestUtils.randomWithRange(1, 1000);
+
+                headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "MessageNumber", String.valueOf(i));
+
+                kafkaClient.produce(topic, "Sink test message " + randomNumber, headers);
+                kafkaClient.produce(topic, "Sink test message " + randomNumber + 1, headers);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
     @Test
     @Timeout(90)
     public void testIdempotentBodySendReceive() {
@@ -184,7 +219,30 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
                         .withExpressionType("body")
                         .end();
 
-            runTest(connectorPropertyFactory);
+            runTest(connectorPropertyFactory, this::produceMessagesNoProperties);
+
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testIdempotentHeaderSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(topic)
+                    .withConnectionProperties(connectionProperties())
+                    .withDestinationName(destinationName)
+                    .withIdempotency()
+                    .withRepositoryType("memory")
+                    .withExpressionType("header")
+                    .withExpressionHeader("MessageNumber")
+                    .end();
+
+            runTest(connectorPropertyFactory, this::produceMessagesWithProperties);
 
         } catch (Exception e) {
             LOG.error("JMS test failed: {}", e.getMessage(), e);


[camel-kafka-connector] 02/05: Avoid reusing the same destination name for the sjms2 idempotency tests

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master-master-align
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e6895143a50fdbf374eb906cecaf7c88764fc622
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 08:46:56 2021 +0100

    Avoid reusing the same destination name for the sjms2 idempotency tests
---
 .../kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 8eceee2..566e823 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -62,6 +62,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class);
 
     private String topic;
+    private String destinationName;
     private int received;
     private final int expect = 10;
 
@@ -84,6 +85,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
         topic = TestUtils.getDefaultTestTopic(this.getClass());
+        destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
     }
 
     private boolean checkRecord(Message jmsMessage) {
@@ -111,7 +113,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
             jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
             jmsClient.start();
 
-            try (MessageConsumer consumer = jmsClient.createConsumer(SJMS2Common.DEFAULT_JMS_QUEUE)) {
+            try (MessageConsumer consumer = jmsClient.createConsumer(destinationName)) {
                 // number of retries until stale
                 int retries = 10;
 
@@ -176,7 +178,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
                     .basic()
                     .withTopics(topic)
                     .withConnectionProperties(connectionProperties())
-                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .withDestinationName(destinationName)
                     .withIdempotency()
                         .withRepositoryType("memory")
                         .withExpressionType("body")


[camel-kafka-connector] 05/05: SJMS2 itests: Avoid creating a new consumer for every message received

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master-master-align
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 4bb4af525e0a1f59fb814d667a2cb8fbd61d40aa
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 08:36:34 2021 +0100

    SJMS2 itests: Avoid creating a new consumer for every message received
---
 .../camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java       | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index c67bba6..41b87a8 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -175,9 +176,10 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
             jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
 
             jmsClient.start();
-
-            for (int i = 0; i < expect; i++) {
-                jmsClient.receive(SJMS2Common.DEFAULT_JMS_QUEUE, this::checkRecord);
+            try (MessageConsumer consumer = jmsClient.createConsumer(SJMS2Common.DEFAULT_JMS_QUEUE)) {
+                for (int i = 0; i < expect; i++) {
+                    jmsClient.receive(consumer, this::checkRecord);
+                }
             }
 
         } catch (Exception e) {


[camel-kafka-connector] 04/05: Add connector entries to doc nav

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master-master-align
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 98fefdac9259dc2fd597556702cefcd5f71130ac
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Wed Feb 3 17:27:00 2021 +0900

    Add connector entries to doc nav
---
 docs/modules/ROOT/nav.adoc                         | 884 +++++++++++++++++++++
 .../maven/docs/UpdateDocComponentsListMojo.java    |  49 +-
 .../src/main/resources/nav.mvel                    |   8 +
 3 files changed, 939 insertions(+), 2 deletions(-)

diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc
index 41a65be..c9a811c 100644
--- a/docs/modules/ROOT/nav.adoc
+++ b/docs/modules/ROOT/nav.adoc
@@ -13,6 +13,890 @@
 ** xref:try-it-out-on-openshift-with-strimzi.adoc[Try it on OpenShift cluster]
 ** xref:getting-started-with-packages.adoc[Packages documentation]
 * xref:connectors.adoc[Connectors list]
+// connectors: START
+** camel-activemq-kafka-connector
+*** xref:connectors/camel-activemq-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-activemq-kafka-sink-connector.adoc[Sink Docs]
+** camel-ahc-kafka-connector
+*** xref:connectors/camel-ahc-kafka-sink-connector.adoc[Sink Docs]
+** camel-ahc-ws-kafka-connector
+*** xref:connectors/camel-ahc-ws-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ahc-ws-kafka-sink-connector.adoc[Sink Docs]
+** camel-ahc-wss-kafka-connector
+*** xref:connectors/camel-ahc-wss-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ahc-wss-kafka-sink-connector.adoc[Sink Docs]
+** camel-amqp-kafka-connector
+*** xref:connectors/camel-amqp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-amqp-kafka-sink-connector.adoc[Sink Docs]
+** camel-apns-kafka-connector
+*** xref:connectors/camel-apns-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-apns-kafka-sink-connector.adoc[Sink Docs]
+** camel-arangodb-kafka-connector
+*** xref:connectors/camel-arangodb-kafka-sink-connector.adoc[Sink Docs]
+** camel-as2-kafka-connector
+*** xref:connectors/camel-as2-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-as2-kafka-sink-connector.adoc[Sink Docs]
+** camel-asterisk-kafka-connector
+*** xref:connectors/camel-asterisk-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-asterisk-kafka-sink-connector.adoc[Sink Docs]
+** camel-atlasmap-kafka-connector
+*** xref:connectors/camel-atlasmap-kafka-sink-connector.adoc[Sink Docs]
+** camel-atmos-kafka-connector
+*** xref:connectors/camel-atmos-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-atmos-kafka-sink-connector.adoc[Sink Docs]
+** camel-atmosphere-websocket-kafka-connector
+*** xref:connectors/camel-atmosphere-websocket-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-atmosphere-websocket-kafka-sink-connector.adoc[Sink Docs]
+** camel-atom-kafka-connector
+*** xref:connectors/camel-atom-kafka-source-connector.adoc[Source Docs]
+** camel-atomix-map-kafka-connector
+*** xref:connectors/camel-atomix-map-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-atomix-map-kafka-sink-connector.adoc[Sink Docs]
+** camel-atomix-messaging-kafka-connector
+*** xref:connectors/camel-atomix-messaging-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-atomix-messaging-kafka-sink-connector.adoc[Sink Docs]
+** camel-atomix-multimap-kafka-connector
+*** xref:connectors/camel-atomix-multimap-kafka-sink-connector.adoc[Sink Docs]
+** camel-atomix-queue-kafka-connector
+*** xref:connectors/camel-atomix-queue-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-atomix-queue-kafka-sink-connector.adoc[Sink Docs]
+** camel-atomix-set-kafka-connector
+*** xref:connectors/camel-atomix-set-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-atomix-set-kafka-sink-connector.adoc[Sink Docs]
+** camel-atomix-value-kafka-connector
+*** xref:connectors/camel-atomix-value-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-atomix-value-kafka-sink-connector.adoc[Sink Docs]
+** camel-avro-kafka-connector
+*** xref:connectors/camel-avro-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-avro-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-cw-kafka-connector
+*** xref:connectors/camel-aws-cw-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ddb-kafka-connector
+*** xref:connectors/camel-aws-ddb-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ddbstream-kafka-connector
+*** xref:connectors/camel-aws-ddbstream-kafka-source-connector.adoc[Source Docs]
+** camel-aws-ec2-kafka-connector
+*** xref:connectors/camel-aws-ec2-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ecs-kafka-connector
+*** xref:connectors/camel-aws-ecs-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-eks-kafka-connector
+*** xref:connectors/camel-aws-eks-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-iam-kafka-connector
+*** xref:connectors/camel-aws-iam-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-kinesis-firehose-kafka-connector
+*** xref:connectors/camel-aws-kinesis-firehose-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-kinesis-kafka-connector
+*** xref:connectors/camel-aws-kinesis-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-kinesis-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-kms-kafka-connector
+*** xref:connectors/camel-aws-kms-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-lambda-kafka-connector
+*** xref:connectors/camel-aws-lambda-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-mq-kafka-connector
+*** xref:connectors/camel-aws-mq-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-msk-kafka-connector
+*** xref:connectors/camel-aws-msk-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-s3-kafka-connector
+*** xref:connectors/camel-aws-s3-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-s3-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-sdb-kafka-connector
+*** xref:connectors/camel-aws-sdb-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-ses-kafka-connector
+*** xref:connectors/camel-aws-ses-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-sns-kafka-connector
+*** xref:connectors/camel-aws-sns-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-sqs-kafka-connector
+*** xref:connectors/camel-aws-sqs-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-sqs-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-swf-kafka-connector
+*** xref:connectors/camel-aws-swf-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws-swf-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws-translate-kafka-connector
+*** xref:connectors/camel-aws-translate-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-athena-kafka-connector
+*** xref:connectors/camel-aws2-athena-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-cw-kafka-connector
+*** xref:connectors/camel-aws2-cw-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-ddb-kafka-connector
+*** xref:connectors/camel-aws2-ddb-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-ddbstream-kafka-connector
+*** xref:connectors/camel-aws2-ddbstream-kafka-source-connector.adoc[Source Docs]
+** camel-aws2-ec2-kafka-connector
+*** xref:connectors/camel-aws2-ec2-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-ecs-kafka-connector
+*** xref:connectors/camel-aws2-ecs-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-eks-kafka-connector
+*** xref:connectors/camel-aws2-eks-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-eventbridge-kafka-connector
+*** xref:connectors/camel-aws2-eventbridge-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-iam-kafka-connector
+*** xref:connectors/camel-aws2-iam-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-kinesis-firehose-kafka-connector
+*** xref:connectors/camel-aws2-kinesis-firehose-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-kinesis-kafka-connector
+*** xref:connectors/camel-aws2-kinesis-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws2-kinesis-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-kms-kafka-connector
+*** xref:connectors/camel-aws2-kms-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-lambda-kafka-connector
+*** xref:connectors/camel-aws2-lambda-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-mq-kafka-connector
+*** xref:connectors/camel-aws2-mq-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-msk-kafka-connector
+*** xref:connectors/camel-aws2-msk-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-s3-kafka-connector
+*** xref:connectors/camel-aws2-s3-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws2-s3-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-ses-kafka-connector
+*** xref:connectors/camel-aws2-ses-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-sns-kafka-connector
+*** xref:connectors/camel-aws2-sns-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-sqs-kafka-connector
+*** xref:connectors/camel-aws2-sqs-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-aws2-sqs-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-sts-kafka-connector
+*** xref:connectors/camel-aws2-sts-kafka-sink-connector.adoc[Sink Docs]
+** camel-aws2-translate-kafka-connector
+*** xref:connectors/camel-aws2-translate-kafka-sink-connector.adoc[Sink Docs]
+** camel-azure-blob-kafka-connector
+*** xref:connectors/camel-azure-blob-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-azure-blob-kafka-sink-connector.adoc[Sink Docs]
+** camel-azure-eventhubs-kafka-connector
+*** xref:connectors/camel-azure-eventhubs-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-azure-eventhubs-kafka-sink-connector.adoc[Sink Docs]
+** camel-azure-queue-kafka-connector
+*** xref:connectors/camel-azure-queue-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-azure-queue-kafka-sink-connector.adoc[Sink Docs]
+** camel-azure-storage-blob-kafka-connector
+*** xref:connectors/camel-azure-storage-blob-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-azure-storage-blob-kafka-sink-connector.adoc[Sink Docs]
+** camel-azure-storage-queue-kafka-connector
+*** xref:connectors/camel-azure-storage-queue-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-azure-storage-queue-kafka-sink-connector.adoc[Sink Docs]
+** camel-bean-kafka-connector
+*** xref:connectors/camel-bean-kafka-sink-connector.adoc[Sink Docs]
+** camel-beanstalk-kafka-connector
+*** xref:connectors/camel-beanstalk-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-beanstalk-kafka-sink-connector.adoc[Sink Docs]
+** camel-box-kafka-connector
+*** xref:connectors/camel-box-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-box-kafka-sink-connector.adoc[Sink Docs]
+** camel-braintree-kafka-connector
+*** xref:connectors/camel-braintree-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-braintree-kafka-sink-connector.adoc[Sink Docs]
+** camel-caffeine-cache-kafka-connector
+*** xref:connectors/camel-caffeine-cache-kafka-sink-connector.adoc[Sink Docs]
+** camel-caffeine-loadcache-kafka-connector
+*** xref:connectors/camel-caffeine-loadcache-kafka-sink-connector.adoc[Sink Docs]
+** camel-chatscript-kafka-connector
+*** xref:connectors/camel-chatscript-kafka-sink-connector.adoc[Sink Docs]
+** camel-chunk-kafka-connector
+*** xref:connectors/camel-chunk-kafka-sink-connector.adoc[Sink Docs]
+** camel-cm-sms-kafka-connector
+*** xref:connectors/camel-cm-sms-kafka-sink-connector.adoc[Sink Docs]
+** camel-cmis-kafka-connector
+*** xref:connectors/camel-cmis-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-cmis-kafka-sink-connector.adoc[Sink Docs]
+** camel-coap-kafka-connector
+*** xref:connectors/camel-coap-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-coap-kafka-sink-connector.adoc[Sink Docs]
+** camel-coap-tcp-kafka-connector
+*** xref:connectors/camel-coap+tcp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-coap+tcp-kafka-sink-connector.adoc[Sink Docs]
+** camel-coaps-kafka-connector
+*** xref:connectors/camel-coaps-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-coaps-kafka-sink-connector.adoc[Sink Docs]
+** camel-coaps-tcp-kafka-connector
+*** xref:connectors/camel-coaps+tcp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-coaps+tcp-kafka-sink-connector.adoc[Sink Docs]
+** camel-cometd-kafka-connector
+*** xref:connectors/camel-cometd-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-cometd-kafka-sink-connector.adoc[Sink Docs]
+** camel-cometds-kafka-connector
+*** xref:connectors/camel-cometds-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-cometds-kafka-sink-connector.adoc[Sink Docs]
+** camel-consul-kafka-connector
+*** xref:connectors/camel-consul-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-consul-kafka-sink-connector.adoc[Sink Docs]
+** camel-controlbus-kafka-connector
+*** xref:connectors/camel-controlbus-kafka-sink-connector.adoc[Sink Docs]
+** camel-corda-kafka-connector
+*** xref:connectors/camel-corda-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-corda-kafka-sink-connector.adoc[Sink Docs]
+** camel-couchbase-kafka-connector
+*** xref:connectors/camel-couchbase-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-couchbase-kafka-sink-connector.adoc[Sink Docs]
+** camel-couchdb-kafka-connector
+*** xref:connectors/camel-couchdb-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-couchdb-kafka-sink-connector.adoc[Sink Docs]
+** camel-cql-kafka-connector
+*** xref:connectors/camel-cql-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-cql-kafka-sink-connector.adoc[Sink Docs]
+** camel-cron-kafka-connector
+*** xref:connectors/camel-cron-kafka-source-connector.adoc[Source Docs]
+** camel-crypto-cms-kafka-connector
+*** xref:connectors/camel-crypto-cms-kafka-sink-connector.adoc[Sink Docs]
+** camel-crypto-kafka-connector
+*** xref:connectors/camel-crypto-kafka-sink-connector.adoc[Sink Docs]
+** camel-cxf-kafka-connector
+*** xref:connectors/camel-cxf-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-cxf-kafka-sink-connector.adoc[Sink Docs]
+** camel-cxfrs-kafka-connector
+*** xref:connectors/camel-cxfrs-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-cxfrs-kafka-sink-connector.adoc[Sink Docs]
+** camel-dataformat-kafka-connector
+*** xref:connectors/camel-dataformat-kafka-sink-connector.adoc[Sink Docs]
+** camel-direct-kafka-connector
+*** xref:connectors/camel-direct-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-direct-kafka-sink-connector.adoc[Sink Docs]
+** camel-direct-vm-kafka-connector
+*** xref:connectors/camel-direct-vm-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-direct-vm-kafka-sink-connector.adoc[Sink Docs]
+** camel-disruptor-kafka-connector
+*** xref:connectors/camel-disruptor-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-disruptor-kafka-sink-connector.adoc[Sink Docs]
+** camel-disruptor-vm-kafka-connector
+*** xref:connectors/camel-disruptor-vm-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-disruptor-vm-kafka-sink-connector.adoc[Sink Docs]
+** camel-djl-kafka-connector
+*** xref:connectors/camel-djl-kafka-sink-connector.adoc[Sink Docs]
+** camel-dns-kafka-connector
+*** xref:connectors/camel-dns-kafka-sink-connector.adoc[Sink Docs]
+** camel-docker-kafka-connector
+*** xref:connectors/camel-docker-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-docker-kafka-sink-connector.adoc[Sink Docs]
+** camel-dozer-kafka-connector
+*** xref:connectors/camel-dozer-kafka-sink-connector.adoc[Sink Docs]
+** camel-drill-kafka-connector
+*** xref:connectors/camel-drill-kafka-sink-connector.adoc[Sink Docs]
+** camel-dropbox-kafka-connector
+*** xref:connectors/camel-dropbox-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-dropbox-kafka-sink-connector.adoc[Sink Docs]
+** camel-ehcache-kafka-connector
+*** xref:connectors/camel-ehcache-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ehcache-kafka-sink-connector.adoc[Sink Docs]
+** camel-elasticsearch-rest-kafka-connector
+*** xref:connectors/camel-elasticsearch-rest-kafka-sink-connector.adoc[Sink Docs]
+** camel-elsql-kafka-connector
+*** xref:connectors/camel-elsql-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-elsql-kafka-sink-connector.adoc[Sink Docs]
+** camel-elytron-kafka-connector
+*** xref:connectors/camel-elytron-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-elytron-kafka-sink-connector.adoc[Sink Docs]
+** camel-etcd-keys-kafka-connector
+*** xref:connectors/camel-etcd-keys-kafka-sink-connector.adoc[Sink Docs]
+** camel-etcd-stats-kafka-connector
+*** xref:connectors/camel-etcd-stats-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-etcd-stats-kafka-sink-connector.adoc[Sink Docs]
+** camel-etcd-watch-kafka-connector
+*** xref:connectors/camel-etcd-watch-kafka-source-connector.adoc[Source Docs]
+** camel-exec-kafka-connector
+*** xref:connectors/camel-exec-kafka-sink-connector.adoc[Sink Docs]
+** camel-facebook-kafka-connector
+*** xref:connectors/camel-facebook-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-facebook-kafka-sink-connector.adoc[Sink Docs]
+** camel-fhir-kafka-connector
+*** xref:connectors/camel-fhir-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-fhir-kafka-sink-connector.adoc[Sink Docs]
+** camel-file-kafka-connector
+*** xref:connectors/camel-file-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-file-kafka-sink-connector.adoc[Sink Docs]
+** camel-file-watch-kafka-connector
+*** xref:connectors/camel-file-watch-kafka-source-connector.adoc[Source Docs]
+** camel-flatpack-kafka-connector
+*** xref:connectors/camel-flatpack-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-flatpack-kafka-sink-connector.adoc[Sink Docs]
+** camel-flink-kafka-connector
+*** xref:connectors/camel-flink-kafka-sink-connector.adoc[Sink Docs]
+** camel-fop-kafka-connector
+*** xref:connectors/camel-fop-kafka-sink-connector.adoc[Sink Docs]
+** camel-freemarker-kafka-connector
+*** xref:connectors/camel-freemarker-kafka-sink-connector.adoc[Sink Docs]
+** camel-ftp-kafka-connector
+*** xref:connectors/camel-ftp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ftp-kafka-sink-connector.adoc[Sink Docs]
+** camel-ftps-kafka-connector
+*** xref:connectors/camel-ftps-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ftps-kafka-sink-connector.adoc[Sink Docs]
+** camel-ganglia-kafka-connector
+*** xref:connectors/camel-ganglia-kafka-sink-connector.adoc[Sink Docs]
+** camel-geocoder-kafka-connector
+*** xref:connectors/camel-geocoder-kafka-sink-connector.adoc[Sink Docs]
+** camel-git-kafka-connector
+*** xref:connectors/camel-git-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-git-kafka-sink-connector.adoc[Sink Docs]
+** camel-github-kafka-connector
+*** xref:connectors/camel-github-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-github-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-bigquery-kafka-connector
+*** xref:connectors/camel-google-bigquery-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-bigquery-sql-kafka-connector
+*** xref:connectors/camel-google-bigquery-sql-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-calendar-kafka-connector
+*** xref:connectors/camel-google-calendar-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-google-calendar-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-calendar-stream-kafka-connector
+*** xref:connectors/camel-google-calendar-stream-kafka-source-connector.adoc[Source Docs]
+** camel-google-drive-kafka-connector
+*** xref:connectors/camel-google-drive-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-google-drive-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-mail-kafka-connector
+*** xref:connectors/camel-google-mail-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-google-mail-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-mail-stream-kafka-connector
+*** xref:connectors/camel-google-mail-stream-kafka-source-connector.adoc[Source Docs]
+** camel-google-pubsub-kafka-connector
+*** xref:connectors/camel-google-pubsub-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-google-pubsub-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-sheets-kafka-connector
+*** xref:connectors/camel-google-sheets-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-google-sheets-kafka-sink-connector.adoc[Sink Docs]
+** camel-google-sheets-stream-kafka-connector
+*** xref:connectors/camel-google-sheets-stream-kafka-source-connector.adoc[Source Docs]
+** camel-gora-kafka-connector
+*** xref:connectors/camel-gora-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-gora-kafka-sink-connector.adoc[Sink Docs]
+** camel-grape-kafka-connector
+*** xref:connectors/camel-grape-kafka-sink-connector.adoc[Sink Docs]
+** camel-graphql-kafka-connector
+*** xref:connectors/camel-graphql-kafka-sink-connector.adoc[Sink Docs]
+** camel-grpc-kafka-connector
+*** xref:connectors/camel-grpc-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-grpc-kafka-sink-connector.adoc[Sink Docs]
+** camel-guava-eventbus-kafka-connector
+*** xref:connectors/camel-guava-eventbus-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-guava-eventbus-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-atomicvalue-kafka-connector
+*** xref:connectors/camel-hazelcast-atomicvalue-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-instance-kafka-connector
+*** xref:connectors/camel-hazelcast-instance-kafka-source-connector.adoc[Source Docs]
+** camel-hazelcast-list-kafka-connector
+*** xref:connectors/camel-hazelcast-list-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-list-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-map-kafka-connector
+*** xref:connectors/camel-hazelcast-map-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-map-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-multimap-kafka-connector
+*** xref:connectors/camel-hazelcast-multimap-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-multimap-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-queue-kafka-connector
+*** xref:connectors/camel-hazelcast-queue-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-queue-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-replicatedmap-kafka-connector
+*** xref:connectors/camel-hazelcast-replicatedmap-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-replicatedmap-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-ringbuffer-kafka-connector
+*** xref:connectors/camel-hazelcast-ringbuffer-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-seda-kafka-connector
+*** xref:connectors/camel-hazelcast-seda-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-seda-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-set-kafka-connector
+*** xref:connectors/camel-hazelcast-set-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-set-kafka-sink-connector.adoc[Sink Docs]
+** camel-hazelcast-topic-kafka-connector
+*** xref:connectors/camel-hazelcast-topic-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hazelcast-topic-kafka-sink-connector.adoc[Sink Docs]
+** camel-hbase-kafka-connector
+*** xref:connectors/camel-hbase-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hbase-kafka-sink-connector.adoc[Sink Docs]
+** camel-hdfs-kafka-connector
+*** xref:connectors/camel-hdfs-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-hdfs-kafka-sink-connector.adoc[Sink Docs]
+** camel-http-kafka-connector
+*** xref:connectors/camel-http-kafka-sink-connector.adoc[Sink Docs]
+** camel-https-kafka-connector
+*** xref:connectors/camel-https-kafka-sink-connector.adoc[Sink Docs]
+** camel-iec60870-client-kafka-connector
+*** xref:connectors/camel-iec60870-client-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-iec60870-client-kafka-sink-connector.adoc[Sink Docs]
+** camel-iec60870-server-kafka-connector
+*** xref:connectors/camel-iec60870-server-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-iec60870-server-kafka-sink-connector.adoc[Sink Docs]
+** camel-ignite-cache-kafka-connector
+*** xref:connectors/camel-ignite-cache-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ignite-cache-kafka-sink-connector.adoc[Sink Docs]
+** camel-ignite-compute-kafka-connector
+*** xref:connectors/camel-ignite-compute-kafka-sink-connector.adoc[Sink Docs]
+** camel-ignite-events-kafka-connector
+*** xref:connectors/camel-ignite-events-kafka-source-connector.adoc[Source Docs]
+** camel-ignite-idgen-kafka-connector
+*** xref:connectors/camel-ignite-idgen-kafka-sink-connector.adoc[Sink Docs]
+** camel-ignite-messaging-kafka-connector
+*** xref:connectors/camel-ignite-messaging-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ignite-messaging-kafka-sink-connector.adoc[Sink Docs]
+** camel-ignite-queue-kafka-connector
+*** xref:connectors/camel-ignite-queue-kafka-sink-connector.adoc[Sink Docs]
+** camel-ignite-set-kafka-connector
+*** xref:connectors/camel-ignite-set-kafka-sink-connector.adoc[Sink Docs]
+** camel-imap-kafka-connector
+*** xref:connectors/camel-imap-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-imap-kafka-sink-connector.adoc[Sink Docs]
+** camel-imaps-kafka-connector
+*** xref:connectors/camel-imaps-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-imaps-kafka-sink-connector.adoc[Sink Docs]
+** camel-infinispan-kafka-connector
+*** xref:connectors/camel-infinispan-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-infinispan-kafka-sink-connector.adoc[Sink Docs]
+** camel-influxdb-kafka-connector
+*** xref:connectors/camel-influxdb-kafka-sink-connector.adoc[Sink Docs]
+** camel-iota-kafka-connector
+*** xref:connectors/camel-iota-kafka-sink-connector.adoc[Sink Docs]
+** camel-ipfs-kafka-connector
+*** xref:connectors/camel-ipfs-kafka-sink-connector.adoc[Sink Docs]
+** camel-irc-kafka-connector
+*** xref:connectors/camel-irc-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-irc-kafka-sink-connector.adoc[Sink Docs]
+** camel-ironmq-kafka-connector
+*** xref:connectors/camel-ironmq-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ironmq-kafka-sink-connector.adoc[Sink Docs]
+** camel-jbpm-kafka-connector
+*** xref:connectors/camel-jbpm-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jbpm-kafka-sink-connector.adoc[Sink Docs]
+** camel-jcache-kafka-connector
+*** xref:connectors/camel-jcache-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jcache-kafka-sink-connector.adoc[Sink Docs]
+** camel-jclouds-kafka-connector
+*** xref:connectors/camel-jclouds-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jclouds-kafka-sink-connector.adoc[Sink Docs]
+** camel-jcr-kafka-connector
+*** xref:connectors/camel-jcr-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jcr-kafka-sink-connector.adoc[Sink Docs]
+** camel-jdbc-kafka-connector
+*** xref:connectors/camel-jdbc-kafka-sink-connector.adoc[Sink Docs]
+** camel-jetty-kafka-connector
+*** xref:connectors/camel-jetty-kafka-source-connector.adoc[Source Docs]
+** camel-jgroups-kafka-connector
+*** xref:connectors/camel-jgroups-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jgroups-kafka-sink-connector.adoc[Sink Docs]
+** camel-jgroups-raft-kafka-connector
+*** xref:connectors/camel-jgroups-raft-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jgroups-raft-kafka-sink-connector.adoc[Sink Docs]
+** camel-jing-kafka-connector
+*** xref:connectors/camel-jing-kafka-sink-connector.adoc[Sink Docs]
+** camel-jira-kafka-connector
+*** xref:connectors/camel-jira-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jira-kafka-sink-connector.adoc[Sink Docs]
+** camel-jms-kafka-connector
+*** xref:connectors/camel-jms-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jms-kafka-sink-connector.adoc[Sink Docs]
+** camel-jmx-kafka-connector
+*** xref:connectors/camel-jmx-kafka-source-connector.adoc[Source Docs]
+** camel-jolt-kafka-connector
+*** xref:connectors/camel-jolt-kafka-sink-connector.adoc[Sink Docs]
+** camel-jooq-kafka-connector
+*** xref:connectors/camel-jooq-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jooq-kafka-sink-connector.adoc[Sink Docs]
+** camel-jpa-kafka-connector
+*** xref:connectors/camel-jpa-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jpa-kafka-sink-connector.adoc[Sink Docs]
+** camel-jslt-kafka-connector
+*** xref:connectors/camel-jslt-kafka-sink-connector.adoc[Sink Docs]
+** camel-json-validator-kafka-connector
+*** xref:connectors/camel-json-validator-kafka-sink-connector.adoc[Sink Docs]
+** camel-jsonata-kafka-connector
+*** xref:connectors/camel-jsonata-kafka-sink-connector.adoc[Sink Docs]
+** camel-jt400-kafka-connector
+*** xref:connectors/camel-jt400-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-jt400-kafka-sink-connector.adoc[Sink Docs]
+** camel-kafka-kafka-connector
+*** xref:connectors/camel-kafka-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kafka-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-config-maps-kafka-connector
+*** xref:connectors/camel-kubernetes-config-maps-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-custom-resources-kafka-connector
+*** xref:connectors/camel-kubernetes-custom-resources-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-custom-resources-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-deployments-kafka-connector
+*** xref:connectors/camel-kubernetes-deployments-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-deployments-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-hpa-kafka-connector
+*** xref:connectors/camel-kubernetes-hpa-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-hpa-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-job-kafka-connector
+*** xref:connectors/camel-kubernetes-job-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-job-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-namespaces-kafka-connector
+*** xref:connectors/camel-kubernetes-namespaces-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-namespaces-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-nodes-kafka-connector
+*** xref:connectors/camel-kubernetes-nodes-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-nodes-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-persistent-volumes-claims-kafka-connector
+*** xref:connectors/camel-kubernetes-persistent-volumes-claims-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-persistent-volumes-kafka-connector
+*** xref:connectors/camel-kubernetes-persistent-volumes-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-pods-kafka-connector
+*** xref:connectors/camel-kubernetes-pods-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-pods-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-replication-controllers-kafka-connector
+*** xref:connectors/camel-kubernetes-replication-controllers-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-replication-controllers-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-resources-quota-kafka-connector
+*** xref:connectors/camel-kubernetes-resources-quota-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-secrets-kafka-connector
+*** xref:connectors/camel-kubernetes-secrets-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-service-accounts-kafka-connector
+*** xref:connectors/camel-kubernetes-service-accounts-kafka-sink-connector.adoc[Sink Docs]
+** camel-kubernetes-services-kafka-connector
+*** xref:connectors/camel-kubernetes-services-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-kubernetes-services-kafka-sink-connector.adoc[Sink Docs]
+** camel-kudu-kafka-connector
+*** xref:connectors/camel-kudu-kafka-sink-connector.adoc[Sink Docs]
+** camel-language-kafka-connector
+*** xref:connectors/camel-language-kafka-sink-connector.adoc[Sink Docs]
+** camel-ldap-kafka-connector
+*** xref:connectors/camel-ldap-kafka-sink-connector.adoc[Sink Docs]
+** camel-ldif-kafka-connector
+*** xref:connectors/camel-ldif-kafka-sink-connector.adoc[Sink Docs]
+** camel-log-kafka-connector
+*** xref:connectors/camel-log-kafka-sink-connector.adoc[Sink Docs]
+** camel-lpr-kafka-connector
+*** xref:connectors/camel-lpr-kafka-sink-connector.adoc[Sink Docs]
+** camel-lucene-kafka-connector
+*** xref:connectors/camel-lucene-kafka-sink-connector.adoc[Sink Docs]
+** camel-lumberjack-kafka-connector
+*** xref:connectors/camel-lumberjack-kafka-source-connector.adoc[Source Docs]
+** camel-master-kafka-connector
+*** xref:connectors/camel-master-kafka-source-connector.adoc[Source Docs]
+** camel-metrics-kafka-connector
+*** xref:connectors/camel-metrics-kafka-sink-connector.adoc[Sink Docs]
+** camel-micrometer-kafka-connector
+*** xref:connectors/camel-micrometer-kafka-sink-connector.adoc[Sink Docs]
+** camel-microprofile-metrics-kafka-connector
+*** xref:connectors/camel-microprofile-metrics-kafka-sink-connector.adoc[Sink Docs]
+** camel-milo-client-kafka-connector
+*** xref:connectors/camel-milo-client-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-milo-client-kafka-sink-connector.adoc[Sink Docs]
+** camel-milo-server-kafka-connector
+*** xref:connectors/camel-milo-server-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-milo-server-kafka-sink-connector.adoc[Sink Docs]
+** camel-mina-kafka-connector
+*** xref:connectors/camel-mina-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-mina-kafka-sink-connector.adoc[Sink Docs]
+** camel-minio-kafka-connector
+*** xref:connectors/camel-minio-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-minio-kafka-sink-connector.adoc[Sink Docs]
+** camel-mllp-kafka-connector
+*** xref:connectors/camel-mllp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-mllp-kafka-sink-connector.adoc[Sink Docs]
+** camel-mongodb-gridfs-kafka-connector
+*** xref:connectors/camel-mongodb-gridfs-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-mongodb-gridfs-kafka-sink-connector.adoc[Sink Docs]
+** camel-mongodb-kafka-connector
+*** xref:connectors/camel-mongodb-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-mongodb-kafka-sink-connector.adoc[Sink Docs]
+** camel-msv-kafka-connector
+*** xref:connectors/camel-msv-kafka-sink-connector.adoc[Sink Docs]
+** camel-mustache-kafka-connector
+*** xref:connectors/camel-mustache-kafka-sink-connector.adoc[Sink Docs]
+** camel-mvel-kafka-connector
+*** xref:connectors/camel-mvel-kafka-sink-connector.adoc[Sink Docs]
+** camel-mybatis-bean-kafka-connector
+*** xref:connectors/camel-mybatis-bean-kafka-sink-connector.adoc[Sink Docs]
+** camel-mybatis-kafka-connector
+*** xref:connectors/camel-mybatis-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-mybatis-kafka-sink-connector.adoc[Sink Docs]
+** camel-nagios-kafka-connector
+*** xref:connectors/camel-nagios-kafka-sink-connector.adoc[Sink Docs]
+** camel-nats-kafka-connector
+*** xref:connectors/camel-nats-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-nats-kafka-sink-connector.adoc[Sink Docs]
+** camel-netty-http-kafka-connector
+*** xref:connectors/camel-netty-http-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-netty-http-kafka-sink-connector.adoc[Sink Docs]
+** camel-netty-kafka-connector
+*** xref:connectors/camel-netty-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-netty-kafka-sink-connector.adoc[Sink Docs]
+** camel-nitrite-kafka-connector
+*** xref:connectors/camel-nitrite-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-nitrite-kafka-sink-connector.adoc[Sink Docs]
+** camel-nsq-kafka-connector
+*** xref:connectors/camel-nsq-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-nsq-kafka-sink-connector.adoc[Sink Docs]
+** camel-oaipmh-kafka-connector
+*** xref:connectors/camel-oaipmh-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-oaipmh-kafka-sink-connector.adoc[Sink Docs]
+** camel-olingo2-kafka-connector
+*** xref:connectors/camel-olingo2-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-olingo2-kafka-sink-connector.adoc[Sink Docs]
+** camel-olingo4-kafka-connector
+*** xref:connectors/camel-olingo4-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-olingo4-kafka-sink-connector.adoc[Sink Docs]
+** camel-openshift-build-configs-kafka-connector
+*** xref:connectors/camel-openshift-build-configs-kafka-sink-connector.adoc[Sink Docs]
+** camel-openshift-builds-kafka-connector
+*** xref:connectors/camel-openshift-builds-kafka-sink-connector.adoc[Sink Docs]
+** camel-openstack-cinder-kafka-connector
+*** xref:connectors/camel-openstack-cinder-kafka-sink-connector.adoc[Sink Docs]
+** camel-openstack-glance-kafka-connector
+*** xref:connectors/camel-openstack-glance-kafka-sink-connector.adoc[Sink Docs]
+** camel-openstack-keystone-kafka-connector
+*** xref:connectors/camel-openstack-keystone-kafka-sink-connector.adoc[Sink Docs]
+** camel-openstack-neutron-kafka-connector
+*** xref:connectors/camel-openstack-neutron-kafka-sink-connector.adoc[Sink Docs]
+** camel-openstack-nova-kafka-connector
+*** xref:connectors/camel-openstack-nova-kafka-sink-connector.adoc[Sink Docs]
+** camel-openstack-swift-kafka-connector
+*** xref:connectors/camel-openstack-swift-kafka-sink-connector.adoc[Sink Docs]
+** camel-optaplanner-kafka-connector
+*** xref:connectors/camel-optaplanner-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-optaplanner-kafka-sink-connector.adoc[Sink Docs]
+** camel-paho-kafka-connector
+*** xref:connectors/camel-paho-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-paho-kafka-sink-connector.adoc[Sink Docs]
+** camel-pdf-kafka-connector
+*** xref:connectors/camel-pdf-kafka-sink-connector.adoc[Sink Docs]
+** camel-pg-replication-slot-kafka-connector
+*** xref:connectors/camel-pg-replication-slot-kafka-source-connector.adoc[Source Docs]
+** camel-pgevent-kafka-connector
+*** xref:connectors/camel-pgevent-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-pgevent-kafka-sink-connector.adoc[Sink Docs]
+** camel-platform-http-kafka-connector
+*** xref:connectors/camel-platform-http-kafka-source-connector.adoc[Source Docs]
+** camel-pop3-kafka-connector
+*** xref:connectors/camel-pop3-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-pop3-kafka-sink-connector.adoc[Sink Docs]
+** camel-pop3s-kafka-connector
+*** xref:connectors/camel-pop3s-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-pop3s-kafka-sink-connector.adoc[Sink Docs]
+** camel-pubnub-kafka-connector
+*** xref:connectors/camel-pubnub-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-pubnub-kafka-sink-connector.adoc[Sink Docs]
+** camel-pulsar-kafka-connector
+*** xref:connectors/camel-pulsar-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-pulsar-kafka-sink-connector.adoc[Sink Docs]
+** camel-quartz-kafka-connector
+*** xref:connectors/camel-quartz-kafka-source-connector.adoc[Source Docs]
+** camel-quickfix-kafka-connector
+*** xref:connectors/camel-quickfix-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-quickfix-kafka-sink-connector.adoc[Sink Docs]
+** camel-rabbitmq-kafka-connector
+*** xref:connectors/camel-rabbitmq-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-rabbitmq-kafka-sink-connector.adoc[Sink Docs]
+** camel-reactive-streams-kafka-connector
+*** xref:connectors/camel-reactive-streams-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-reactive-streams-kafka-sink-connector.adoc[Sink Docs]
+** camel-rest-api-kafka-connector
+*** xref:connectors/camel-rest-api-kafka-source-connector.adoc[Source Docs]
+** camel-rest-kafka-connector
+*** xref:connectors/camel-rest-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-rest-kafka-sink-connector.adoc[Sink Docs]
+** camel-rest-openapi-kafka-connector
+*** xref:connectors/camel-rest-openapi-kafka-sink-connector.adoc[Sink Docs]
+** camel-rest-swagger-kafka-connector
+*** xref:connectors/camel-rest-swagger-kafka-sink-connector.adoc[Sink Docs]
+** camel-resteasy-kafka-connector
+*** xref:connectors/camel-resteasy-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-resteasy-kafka-sink-connector.adoc[Sink Docs]
+** camel-rss-kafka-connector
+*** xref:connectors/camel-rss-kafka-source-connector.adoc[Source Docs]
+** camel-saga-kafka-connector
+*** xref:connectors/camel-saga-kafka-sink-connector.adoc[Sink Docs]
+** camel-salesforce-kafka-connector
+*** xref:connectors/camel-salesforce-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-salesforce-kafka-sink-connector.adoc[Sink Docs]
+** camel-sap-netweaver-kafka-connector
+*** xref:connectors/camel-sap-netweaver-kafka-sink-connector.adoc[Sink Docs]
+** camel-scheduler-kafka-connector
+*** xref:connectors/camel-scheduler-kafka-source-connector.adoc[Source Docs]
+** camel-schematron-kafka-connector
+*** xref:connectors/camel-schematron-kafka-sink-connector.adoc[Sink Docs]
+** camel-scp-kafka-connector
+*** xref:connectors/camel-scp-kafka-sink-connector.adoc[Sink Docs]
+** camel-seda-kafka-connector
+*** xref:connectors/camel-seda-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-seda-kafka-sink-connector.adoc[Sink Docs]
+** camel-service-kafka-connector
+*** xref:connectors/camel-service-kafka-source-connector.adoc[Source Docs]
+** camel-servicenow-kafka-connector
+*** xref:connectors/camel-servicenow-kafka-sink-connector.adoc[Sink Docs]
+** camel-servlet-kafka-connector
+*** xref:connectors/camel-servlet-kafka-source-connector.adoc[Source Docs]
+** camel-sftp-kafka-connector
+*** xref:connectors/camel-sftp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-sftp-kafka-sink-connector.adoc[Sink Docs]
+** camel-sip-kafka-connector
+*** xref:connectors/camel-sip-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-sip-kafka-sink-connector.adoc[Sink Docs]
+** camel-sips-kafka-connector
+*** xref:connectors/camel-sips-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-sips-kafka-sink-connector.adoc[Sink Docs]
+** camel-sjms-batch-kafka-connector
+*** xref:connectors/camel-sjms-batch-kafka-source-connector.adoc[Source Docs]
+** camel-sjms-kafka-connector
+*** xref:connectors/camel-sjms-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-sjms-kafka-sink-connector.adoc[Sink Docs]
+** camel-sjms2-kafka-connector
+*** xref:connectors/camel-sjms2-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-sjms2-kafka-sink-connector.adoc[Sink Docs]
+** camel-slack-kafka-connector
+*** xref:connectors/camel-slack-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-slack-kafka-sink-connector.adoc[Sink Docs]
+** camel-smpp-kafka-connector
+*** xref:connectors/camel-smpp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-smpp-kafka-sink-connector.adoc[Sink Docs]
+** camel-smpps-kafka-connector
+*** xref:connectors/camel-smpps-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-smpps-kafka-sink-connector.adoc[Sink Docs]
+** camel-smtp-kafka-connector
+*** xref:connectors/camel-smtp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-smtp-kafka-sink-connector.adoc[Sink Docs]
+** camel-smtps-kafka-connector
+*** xref:connectors/camel-smtps-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-smtps-kafka-sink-connector.adoc[Sink Docs]
+** camel-snmp-kafka-connector
+*** xref:connectors/camel-snmp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-snmp-kafka-sink-connector.adoc[Sink Docs]
+** camel-solr-kafka-connector
+*** xref:connectors/camel-solr-kafka-sink-connector.adoc[Sink Docs]
+** camel-solrcloud-kafka-connector
+*** xref:connectors/camel-solrCloud-kafka-sink-connector.adoc[Sink Docs]
+** camel-solrs-kafka-connector
+*** xref:connectors/camel-solrs-kafka-sink-connector.adoc[Sink Docs]
+** camel-soroush-kafka-connector
+*** xref:connectors/camel-soroush-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-soroush-kafka-sink-connector.adoc[Sink Docs]
+** camel-spark-kafka-connector
+*** xref:connectors/camel-spark-kafka-sink-connector.adoc[Sink Docs]
+** camel-splunk-hec-kafka-connector
+*** xref:connectors/camel-splunk-hec-kafka-sink-connector.adoc[Sink Docs]
+** camel-splunk-kafka-connector
+*** xref:connectors/camel-splunk-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-splunk-kafka-sink-connector.adoc[Sink Docs]
+** camel-spring-batch-kafka-connector
+*** xref:connectors/camel-spring-batch-kafka-sink-connector.adoc[Sink Docs]
+** camel-spring-event-kafka-connector
+*** xref:connectors/camel-spring-event-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-spring-event-kafka-sink-connector.adoc[Sink Docs]
+** camel-spring-integration-kafka-connector
+*** xref:connectors/camel-spring-integration-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-spring-integration-kafka-sink-connector.adoc[Sink Docs]
+** camel-spring-ldap-kafka-connector
+*** xref:connectors/camel-spring-ldap-kafka-sink-connector.adoc[Sink Docs]
+** camel-spring-redis-kafka-connector
+*** xref:connectors/camel-spring-redis-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-spring-redis-kafka-sink-connector.adoc[Sink Docs]
+** camel-spring-ws-kafka-connector
+*** xref:connectors/camel-spring-ws-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-spring-ws-kafka-sink-connector.adoc[Sink Docs]
+** camel-sql-kafka-connector
+*** xref:connectors/camel-sql-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-sql-kafka-sink-connector.adoc[Sink Docs]
+** camel-sql-stored-kafka-connector
+*** xref:connectors/camel-sql-stored-kafka-sink-connector.adoc[Sink Docs]
+** camel-ssh-kafka-connector
+*** xref:connectors/camel-ssh-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-ssh-kafka-sink-connector.adoc[Sink Docs]
+** camel-stax-kafka-connector
+*** xref:connectors/camel-stax-kafka-sink-connector.adoc[Sink Docs]
+** camel-stomp-kafka-connector
+*** xref:connectors/camel-stomp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-stomp-kafka-sink-connector.adoc[Sink Docs]
+** camel-stream-kafka-connector
+*** xref:connectors/camel-stream-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-stream-kafka-sink-connector.adoc[Sink Docs]
+** camel-string-template-kafka-connector
+*** xref:connectors/camel-string-template-kafka-sink-connector.adoc[Sink Docs]
+** camel-stub-kafka-connector
+*** xref:connectors/camel-stub-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-stub-kafka-sink-connector.adoc[Sink Docs]
+** camel-syslog-kafka-connector
+*** xref:connectors/camel-syslog-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-syslog-kafka-sink-connector.adoc[Sink Docs]
+** camel-telegram-kafka-connector
+*** xref:connectors/camel-telegram-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-telegram-kafka-sink-connector.adoc[Sink Docs]
+** camel-thrift-kafka-connector
+*** xref:connectors/camel-thrift-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-thrift-kafka-sink-connector.adoc[Sink Docs]
+** camel-tika-kafka-connector
+*** xref:connectors/camel-tika-kafka-sink-connector.adoc[Sink Docs]
+** camel-timer-kafka-connector
+*** xref:connectors/camel-timer-kafka-source-connector.adoc[Source Docs]
+** camel-twilio-kafka-connector
+*** xref:connectors/camel-twilio-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-twilio-kafka-sink-connector.adoc[Sink Docs]
+** camel-twitter-directmessage-kafka-connector
+*** xref:connectors/camel-twitter-directmessage-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-twitter-directmessage-kafka-sink-connector.adoc[Sink Docs]
+** camel-twitter-search-kafka-connector
+*** xref:connectors/camel-twitter-search-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-twitter-search-kafka-sink-connector.adoc[Sink Docs]
+** camel-twitter-timeline-kafka-connector
+*** xref:connectors/camel-twitter-timeline-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-twitter-timeline-kafka-sink-connector.adoc[Sink Docs]
+** camel-undertow-kafka-connector
+*** xref:connectors/camel-undertow-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-undertow-kafka-sink-connector.adoc[Sink Docs]
+** camel-validator-kafka-connector
+*** xref:connectors/camel-validator-kafka-sink-connector.adoc[Sink Docs]
+** camel-velocity-kafka-connector
+*** xref:connectors/camel-velocity-kafka-sink-connector.adoc[Sink Docs]
+** camel-vertx-http-kafka-connector
+*** xref:connectors/camel-vertx-http-kafka-sink-connector.adoc[Sink Docs]
+** camel-vertx-kafka-connector
+*** xref:connectors/camel-vertx-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-vertx-kafka-sink-connector.adoc[Sink Docs]
+** camel-vertx-kafka-kafka-connector
+*** xref:connectors/camel-vertx-kafka-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-vertx-kafka-kafka-sink-connector.adoc[Sink Docs]
+** camel-vertx-websocket-kafka-connector
+*** xref:connectors/camel-vertx-websocket-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-vertx-websocket-kafka-sink-connector.adoc[Sink Docs]
+** camel-vm-kafka-connector
+*** xref:connectors/camel-vm-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-vm-kafka-sink-connector.adoc[Sink Docs]
+** camel-weather-kafka-connector
+*** xref:connectors/camel-weather-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-weather-kafka-sink-connector.adoc[Sink Docs]
+** camel-web3j-kafka-connector
+*** xref:connectors/camel-web3j-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-web3j-kafka-sink-connector.adoc[Sink Docs]
+** camel-webhook-kafka-connector
+*** xref:connectors/camel-webhook-kafka-source-connector.adoc[Source Docs]
+** camel-websocket-jsr356-kafka-connector
+*** xref:connectors/camel-websocket-jsr356-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-websocket-jsr356-kafka-sink-connector.adoc[Sink Docs]
+** camel-websocket-kafka-connector
+*** xref:connectors/camel-websocket-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-websocket-kafka-sink-connector.adoc[Sink Docs]
+** camel-weka-kafka-connector
+*** xref:connectors/camel-weka-kafka-sink-connector.adoc[Sink Docs]
+** camel-wordpress-kafka-connector
+*** xref:connectors/camel-wordpress-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-wordpress-kafka-sink-connector.adoc[Sink Docs]
+** camel-workday-kafka-connector
+*** xref:connectors/camel-workday-kafka-sink-connector.adoc[Sink Docs]
+** camel-xchange-kafka-connector
+*** xref:connectors/camel-xchange-kafka-sink-connector.adoc[Sink Docs]
+** camel-xj-kafka-connector
+*** xref:connectors/camel-xj-kafka-sink-connector.adoc[Sink Docs]
+** camel-xmlsecurity-sign-kafka-connector
+*** xref:connectors/camel-xmlsecurity-sign-kafka-sink-connector.adoc[Sink Docs]
+** camel-xmlsecurity-verify-kafka-connector
+*** xref:connectors/camel-xmlsecurity-verify-kafka-sink-connector.adoc[Sink Docs]
+** camel-xmpp-kafka-connector
+*** xref:connectors/camel-xmpp-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-xmpp-kafka-sink-connector.adoc[Sink Docs]
+** camel-xquery-kafka-connector
+*** xref:connectors/camel-xquery-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-xquery-kafka-sink-connector.adoc[Sink Docs]
+** camel-xslt-kafka-connector
+*** xref:connectors/camel-xslt-kafka-sink-connector.adoc[Sink Docs]
+** camel-xslt-saxon-kafka-connector
+*** xref:connectors/camel-xslt-saxon-kafka-sink-connector.adoc[Sink Docs]
+** camel-yammer-kafka-connector
+*** xref:connectors/camel-yammer-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-yammer-kafka-sink-connector.adoc[Sink Docs]
+** camel-zendesk-kafka-connector
+*** xref:connectors/camel-zendesk-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-zendesk-kafka-sink-connector.adoc[Sink Docs]
+** camel-zookeeper-kafka-connector
+*** xref:connectors/camel-zookeeper-kafka-source-connector.adoc[Source Docs]
+*** xref:connectors/camel-zookeeper-kafka-sink-connector.adoc[Sink Docs]
+** camel-zookeeper-master-kafka-connector
+*** xref:connectors/camel-zookeeper-master-kafka-source-connector.adoc[Source Docs]
+// connectors: END
 * xref:camel-compatibility-matrix.adoc[Apache Camel compatibility matrix]
 * xref:testing.adoc[Testing]
 * xref:troubleshooting.adoc[Troubleshooting]
diff --git a/tooling/camel-kafka-connector-docs-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/docs/UpdateDocComponentsListMojo.java b/tooling/camel-kafka-connector-docs-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/docs/UpdateDocComponentsListMojo.java
index 0afb733..a9163bd 100644
--- a/tooling/camel-kafka-connector-docs-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/docs/UpdateDocComponentsListMojo.java
+++ b/tooling/camel-kafka-connector-docs-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/docs/UpdateDocComponentsListMojo.java
@@ -169,8 +169,8 @@ public class UpdateDocComponentsListMojo extends AbstractMojo {
                 tableModel.setOptions(options);
             }
         }
-        File docFolderWebsite = new File(projectBaseDir, "docs/modules/ROOT/pages/");
-        File docFileWebsite = new File(docFolderWebsite, "connectors.adoc");
+        File docFolderWebsite = new File(projectBaseDir, "docs/modules/ROOT/");
+        File docFileWebsite = new File(docFolderWebsite, "pages/connectors.adoc");
         String changed = templateConnnectorsTable(tableModel);
         boolean updated = updateConnectorsTable(docFileWebsite, changed);
         if (updated) {
@@ -178,6 +178,13 @@ public class UpdateDocComponentsListMojo extends AbstractMojo {
         } else {
             getLog().debug("No changes to connectors table file: " + docFileWebsite);
         }
+        File navWebsite = new File(docFolderWebsite, "nav.adoc");
+        boolean navUpdated = updateNav(navWebsite, tableModel);
+        if (navUpdated) {
+            getLog().info("Updated nav file: " + navWebsite);
+        } else {
+            getLog().debug("No changes to nav file: " + navWebsite);
+        }
     }
 
     private String templateConnnectorsTable(CamelKafkaConnectorTableModel model) throws MojoExecutionException {
@@ -228,4 +235,42 @@ public class UpdateDocComponentsListMojo extends AbstractMojo {
             throw new MojoExecutionException("Error reading file " + file + " Reason: " + e, e);
         }
     }
+
+    private boolean updateNav(File file, CamelKafkaConnectorTableModel model) throws MojoExecutionException {
+        String changed = null;
+        try {
+            String template = loadText(UpdateDocComponentsListMojo.class.getClassLoader().getResourceAsStream("nav.mvel"));
+            changed = (String)TemplateRuntime.eval(template, model, Collections.singletonMap("util", MvelHelper.INSTANCE));
+        } catch (Exception e) {
+            throw new MojoExecutionException("Error processing mvel template. Reason: " + e, e);
+        }
+
+        try {
+            String text = loadText(file);
+
+            String existing = Strings.between(text, "// connectors: START", "// connectors: END");
+            if (existing != null) {
+                // remove leading line breaks etc
+                existing = existing.trim();
+                changed = changed.trim();
+                if (existing.equals(changed)) {
+                    return false;
+                } else {
+                    String before = Strings.before(text, "// connectors: START");
+                    String after = Strings.after(text, "// connectors: END");
+                    text = before + "// connectors: START\n" + changed + "\n// connectors: END" + after;
+                    writeText(file, text);
+                    return true;
+                }
+            } else {
+                getLog().warn("Cannot find markers in file " + file);
+                getLog().warn("Add the following markers");
+                getLog().warn("\t// connectors: START");
+                getLog().warn("\t// connectors: END");
+                return false;
+            }
+        } catch (Exception e) {
+            throw new MojoExecutionException("Error reading file " + file + " Reason: " + e, e);
+        }
+    }
 }
diff --git a/tooling/camel-kafka-connector-docs-maven-plugin/src/main/resources/nav.mvel b/tooling/camel-kafka-connector-docs-maven-plugin/src/main/resources/nav.mvel
new file mode 100644
index 0000000..134b78a
--- /dev/null
+++ b/tooling/camel-kafka-connector-docs-maven-plugin/src/main/resources/nav.mvel
@@ -0,0 +1,8 @@
+@if{!options.isEmpty()}
+
+@foreach{connector : options}** ${connector.getName()}
+@if{connector.isSource()}*** ${connector.getDocsSource()}
+@end{}@if{connector.isSink()}*** ${connector.getDocsSink()}
+@end{}@end{}
+
+@end{}


[camel-kafka-connector] 01/05: Added a basic test for idempotency

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master-master-align
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 046ca03c770b54247657658dc718325480683ac0
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 2 17:43:09 2021 +0100

    Added a basic test for idempotency
---
 .../common/BasicConnectorPropertyFactory.java      |   4 +
 .../common/IdempotencyConfigBuilder.java           |  78 +++++++++
 .../kafkaconnector/sjms2/clients/JMSClient.java    |  43 +++--
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   | 192 +++++++++++++++++++++
 4 files changed, 307 insertions(+), 10 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
index a9d012e..0e98490 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
@@ -63,6 +63,10 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp
         return (T) this;
     }
 
+    public IdempotencyConfigBuilder<T> withIdempotency() {
+        return new IdempotencyConfigBuilder<>((T) this, connectorProps);
+    }
+
     /**
      * This enables sending failed records to the DLQ. Note: it automatically configure other required/recommended
      * options!
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java
new file mode 100644
index 0000000..2cde885
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common;
+
+import java.util.Properties;
+
+public class IdempotencyConfigBuilder<T extends ConnectorPropertyFactory> {
+    private final T handle;
+    private final Properties properties;
+
+    public IdempotencyConfigBuilder(T handle, Properties properties) {
+        this.handle = handle;
+        this.properties = properties;
+
+        withEnabled(true);
+    }
+
+    private IdempotencyConfigBuilder<T> withEntry(String key, Object value) {
+        properties.put("camel.idempotency." + key, value);
+
+        return this;
+    }
+
+    public IdempotencyConfigBuilder<T> withEnabled(boolean value) {
+        return withEntry("enabled", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withRepositoryType(String value) {
+        return withEntry("repository.type", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withExpressionType(String value) {
+        return withEntry("expression.type", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withExpressionHeader(String value) {
+        return withEntry("expression.header", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withMemoryDimension(String value) {
+        return withEntry("memory.dimension", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaTopic(String value) {
+        return withEntry("kafka.topic", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaBootstrapServers(String value) {
+        return withEntry("kafka.bootstrap.servers", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaMaxCacheSize(String value) {
+        return withEntry("kafka.max.cache.size", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaPollDurationMs(String value) {
+        return withEntry("kafka.poll.duration.ms", value);
+    }
+
+    public T end() {
+        return handle;
+    }
+}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
index 43586c6..42e4103 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
@@ -161,6 +161,37 @@ public class JMSClient {
         }
     }
 
+    /**
+     * Receives data from a JMS queue or topic
+     *
+     * @param predicate the predicate used to test each received message
+     * @throws JMSException
+     */
+    public void receive(MessageConsumer consumer, Predicate<Message> predicate, long timeout) throws JMSException {
+        while (true) {
+            final Message message = consumer.receive(timeout);
+
+            if (!predicate.test(message)) {
+                return;
+            }
+        }
+    }
+
+
+    /**
+     * Receives data from a JMS queue or topic
+     *
+     * @param predicate the predicate used to test each received message
+     * @throws JMSException
+     */
+    public void receive(MessageConsumer consumer, Predicate<Message> predicate) throws JMSException {
+        receive(consumer, predicate, 3000);
+    }
+
+    public MessageConsumer createConsumer(String queue) throws JMSException {
+        return session.createConsumer(createDestination(queue));
+    }
+
 
     /**
      * Receives data from a JMS queue or topic
@@ -170,20 +201,12 @@ public class JMSClient {
      * @throws JMSException
      */
     public void receive(final String queue, Predicate<Message> predicate) throws JMSException {
-        final long timeout = 3000;
-
         MessageConsumer consumer = null;
 
         try {
-            consumer = session.createConsumer(createDestination(queue));
-
-            while (true) {
-                final Message message = consumer.receive(timeout);
+            consumer = createConsumer(queue);
 
-                if (!predicate.test(message)) {
-                    return;
-                }
-            }
+            receive(consumer, predicate);
         } finally {
             capturingClose(consumer);
         }
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
new file mode 100644
index 0000000..8eceee2
--- /dev/null
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sjms2.sink;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
+import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
+import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
+import org.apache.camel.test.infra.messaging.services.MessagingService;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Integration tests for the JMS sink using idempotent features
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static MessagingService jmsService = MessagingServiceBuilder
+            .newBuilder(DispatchRouterContainer::new)
+            .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+            .build();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class);
+
+    private String topic;
+    private int received;
+    private final int expect = 10;
+
+    private Properties connectionProperties() {
+        Properties properties = new Properties();
+
+        properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
+        properties.put("camel.component.sjms2.connection-factory.remoteURI", jmsService.defaultEndpoint());
+
+        return properties;
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-sjms2-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
+        received = 0;
+        topic = TestUtils.getDefaultTestTopic(this.getClass());
+    }
+
+    private boolean checkRecord(Message jmsMessage) {
+        if (jmsMessage instanceof TextMessage) {
+            try {
+                LOG.debug("Received: {}", ((TextMessage) jmsMessage).getText());
+
+                received++;
+
+                return true;
+            } catch (JMSException e) {
+                LOG.error("Failed to read message: {}", e.getMessage(), e);
+                fail("Failed to read message: " + e.getMessage());
+            }
+        }
+
+        return false;
+    }
+
+
+    private void consumeJMSMessages() {
+        JMSClient jmsClient = null;
+
+        try {
+            jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
+            jmsClient.start();
+
+            try (MessageConsumer consumer = jmsClient.createConsumer(SJMS2Common.DEFAULT_JMS_QUEUE)) {
+                // number of retries until stale
+                int retries = 10;
+
+                while (retries > 0) {
+                    LOG.debug("Waiting for JMS messages (received {} of {} / retry {})", received, expect, retries);
+                    jmsClient.receive(consumer, this::checkRecord, 1000);
+
+                    // Once staled for 'retries', then it means no more data to receive (hopefully)
+                    if (expect == received) {
+                        retries--;
+                    } else {
+                        retries = 10;
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted, stopping ...");
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        } finally {
+            if (jmsClient != null) {
+                jmsClient.stop();
+            }
+        }
+    }
+
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        LOG.debug("Creating the consumer ...");
+        service.submit(() -> consumeJMSMessages());
+
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            LOG.debug("Sending message 1/2");
+            kafkaClient.produce(topic, "Sink test message " + i);
+            LOG.debug("Sending message 2/2");
+            kafkaClient.produce(topic, "Sink test message " + i);
+        }
+
+        LOG.debug("Waiting for the messages to be processed");
+        service.shutdown();
+
+        if (service.awaitTermination(25, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testIdempotentBodySendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(topic)
+                    .withConnectionProperties(connectionProperties())
+                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .withIdempotency()
+                        .withRepositoryType("memory")
+                        .withExpressionType("body")
+                        .end();
+
+            runTest(connectorPropertyFactory);
+
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}