You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/09 08:24:16 UTC

[camel-kafka-connector] branch master updated: Added tests for JDBC sink

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 54cb1e2  Added tests for JDBC sink
     new e385af8  Merge pull request #316 from orpiske/jdbc-tests
54cb1e2 is described below

commit 54cb1e2530007fefaef5d4d6f0b55e5bab3cb5b2
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Thu Mar 19 20:39:02 2020 +0100

    Added tests for JDBC sink
---
 README.adoc                                        |   3 +-
 parent/pom.xml                                     |  13 ++
 .../common/clients/kafka/KafkaClient.java          |  43 ++++++
 tests/itests-jdbc/pom.xml                          |  62 +++++++++
 .../kafkaconnector/jdbc/client/DatabaseClient.java |  69 ++++++++++
 .../jdbc/services/JDBCLocalContainerService.java   |  61 +++++++++
 .../jdbc/services/JDBCRemoteService.java           |  45 +++++++
 .../kafkaconnector/jdbc/services/JDBCService.java  |  51 +++++++
 .../jdbc/services/JDBCServiceFactory.java          |  43 ++++++
 .../jdbc/services/TestDataSource.java              |  41 ++++++
 .../jdbc/sink/CamelJDBCPropertyFactory.java        |  48 +++++++
 .../jdbc/sink/CamelSinkJDBCITCase.java             | 150 +++++++++++++++++++++
 tests/itests-jdbc/src/test/resources/schema.sql    |  22 +++
 tests/pom.xml                                      |   1 +
 14 files changed, 651 insertions(+), 1 deletion(-)

diff --git a/README.adoc b/README.adoc
index 4ce0f87..2c56074 100644
--- a/README.adoc
+++ b/README.adoc
@@ -57,6 +57,8 @@ for remote testing:
 * cassandra.instance.type
 ** cassandra.host
 ** cassandra.cql3.port
+* jdbc.instance.type
+** jdbc.connection.url
 * jms-service.instance.type
 ** jms.broker.address
 * hdfs.instance.type
@@ -66,7 +68,6 @@ for remote testing:
 ** mongodb.host
 ** mongodb.port
 
-
 Additionally, a few manual tests can be enabled and executed with adequate configuration on the accounts and
 environments used by those services. This is very specific to the nature of each of those services, therefore
 please consult the comments on each of those test cases for the details related to their setup.
diff --git a/parent/pom.xml b/parent/pom.xml
index 8f1dae3..3a0c74b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -32,6 +32,7 @@
         <version.java>1.8</version.java>
         <version.guava>20.0</version.guava>
         <version.javax.annotation-api>1.3.2</version.javax.annotation-api>
+        <version.postgres>42.2.14</version.postgres>
 
         <version.maven.compiler>3.8.1</version.maven.compiler>
         <version.maven.javadoc>3.1.1</version.maven.javadoc>
@@ -301,6 +302,18 @@
                 <scope>test</scope>
             </dependency>
 
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>postgresql</artifactId>
+                <version>${testcontainers-version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.postgresql</groupId>
+                <artifactId>postgresql</artifactId>
+                <version>${version.postgres}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
index feaf4e2..e9846ed 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
@@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.common.clients.kafka;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -32,6 +33,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
 
 /**
  * A very simple test message consumer that can consume messages of different types
@@ -45,6 +47,26 @@ public class KafkaClient<K, V> {
     private KafkaProducer<K, V> producer;
     private KafkaConsumer<K, V> consumer;
 
+    private static class TestHeader implements Header {
+        private final String key;
+        private final String value;
+
+        public TestHeader(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public String key() {
+            return this.key;
+        }
+
+        @Override
+        public byte[] value() {
+            return value.getBytes();
+        }
+    }
+
 
     /**
      * Constructs the properties using the given bootstrap server
@@ -98,6 +120,27 @@ public class KafkaClient<K, V> {
         future.get();
     }
 
+
+    /**
+     * Sends data to a topic
+     *
+     * @param topic   the topic to send data to
+     * @param message the message to send
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    public void produce(String topic, V message, Map<String, String> headers) throws ExecutionException, InterruptedException {
+        ProducerRecord<K, V> record = new ProducerRecord<>(topic, message);
+
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            record.headers().add(new TestHeader(entry.getKey(), entry.getValue()));
+        }
+
+        Future<RecordMetadata> future = producer.send(record);
+
+        future.get();
+    }
+
     /**
      * Delete a topic
      *
diff --git a/tests/itests-jdbc/pom.xml b/tests/itests-jdbc/pom.xml
new file mode 100644
index 0000000..960ced5
--- /dev/null
+++ b/tests/itests-jdbc/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-jdbc</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: JDBC</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jdbc</artifactId>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/client/DatabaseClient.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/client/DatabaseClient.java
new file mode 100644
index 0000000..6f2f452
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/client/DatabaseClient.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.client;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.function.Consumer;
+
+import org.postgresql.ds.PGSimpleDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatabaseClient {
+    private static final Logger LOG = LoggerFactory.getLogger(DatabaseClient.class);
+    private PGSimpleDataSource datasource;
+    private final Connection connection;
+
+    public DatabaseClient(String url) throws SQLException {
+        LOG.info("Opening a new database connection using the URL {}", url);
+
+        datasource = new PGSimpleDataSource();
+        datasource.setURL(url);
+        datasource.setUser("ckc");
+        datasource.setPassword("ckcDevel123");
+        connection = datasource.getConnection();
+    }
+
+    public void runQuery(String query, Consumer<ResultSet> consumer) throws SQLException {
+        ResultSet rs = connection.prepareStatement(query).executeQuery();
+
+        while (rs.next()) {
+            consumer.accept(rs);
+        }
+    }
+
+    public int count(String table) throws SQLException {
+        String query = String.format("select count(*) as count from %s", table);
+
+        ResultSet rs = connection.prepareStatement(query).executeQuery();
+
+        while (rs.next()) {
+            return rs.getInt("count");
+        }
+
+        return 0;
+    }
+
+    public boolean hasAtLeastRecords(String table, int expected) throws SQLException {
+        int count = count(table);
+
+        return count >= expected;
+    }
+}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCLocalContainerService.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCLocalContainerService.java
new file mode 100644
index 0000000..e67c72c
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCLocalContainerService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.services;
+
+import java.sql.SQLException;
+
+import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+public class JDBCLocalContainerService implements JDBCService {
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCLocalContainerService.class);
+
+    private static JdbcDatabaseContainer container;
+
+    public JDBCLocalContainerService() {
+        container = new PostgreSQLContainer()
+                .withDatabaseName("camel")
+                .withUsername("ckc")
+                .withPassword("ckcDevel123")
+                .withInitScript("schema.sql")
+                .withStartupTimeoutSeconds(60);
+
+        container.start();
+
+        System.setProperty("jdbc.url", container.getJdbcUrl());
+    }
+
+    @Override
+    public DatabaseClient getClient() throws SQLException {
+        return new DatabaseClient(container.getJdbcUrl());
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("Database instance available via JDBC url {}", container.getJdbcUrl());
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Stopping the database instance");
+        container.stop();
+    }
+}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCRemoteService.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCRemoteService.java
new file mode 100644
index 0000000..12956da
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCRemoteService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.services;
+
+import java.sql.SQLException;
+
+import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
+
+public class JDBCRemoteService implements JDBCService {
+    private static final String CONNECTION_URL;
+
+    static {
+        CONNECTION_URL = System.getProperty("jdbc.connection.url");
+    }
+
+    @Override
+    public void initialize() {
+        // NO-OP
+    }
+
+    @Override
+    public void shutdown() {
+        // NO-OP
+    }
+
+    @Override
+    public DatabaseClient getClient() throws SQLException {
+        return new DatabaseClient(CONNECTION_URL);
+    }
+}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCService.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCService.java
new file mode 100644
index 0000000..c970453
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.services;
+
+import java.sql.SQLException;
+
+import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface JDBCService extends BeforeAllCallback, AfterAllCallback {
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    DatabaseClient getClient() throws SQLException;
+
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCServiceFactory.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCServiceFactory.java
new file mode 100644
index 0000000..ff7ef02
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCServiceFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class JDBCServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCServiceFactory.class);
+
+    private JDBCServiceFactory() {
+    }
+
+    public static JDBCService createService() {
+        String instanceType = System.getProperty("jdbc.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-jdbc-container")) {
+            return new JDBCLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new JDBCRemoteService();
+        }
+
+        LOG.error("JDBC instance must be one of 'local-jdbc-container' or 'remote");
+        throw new UnsupportedOperationException("Invalid JDBC instance type: " + instanceType);
+    }
+}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/TestDataSource.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/TestDataSource.java
new file mode 100644
index 0000000..114bd67
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/TestDataSource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.services;
+
+import org.postgresql.ds.PGSimpleDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestDataSource extends PGSimpleDataSource {
+    private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class);
+
+    private static final String URL;
+
+    static {
+        URL = System.getProperty("jdbc.url");
+    }
+
+    public TestDataSource() {
+        super();
+        setUrl(URL);
+
+        setUser("ckc");
+        setPassword("ckcDevel123");
+
+    }
+}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelJDBCPropertyFactory.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelJDBCPropertyFactory.java
new file mode 100644
index 0000000..15e4c73
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelJDBCPropertyFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.sink;
+
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public final class CamelJDBCPropertyFactory extends SinkConnectorPropertyFactory<CamelJDBCPropertyFactory> {
+    private CamelJDBCPropertyFactory() {
+
+    }
+
+    public CamelJDBCPropertyFactory withDataSource(String value) {
+        return setProperty("camel.component.jdbc.dataSource", value);
+    }
+
+    public CamelJDBCPropertyFactory withDataSourceName(String value) {
+        return setProperty("camel.sink.path.dataSourceName", value);
+    }
+
+    public CamelJDBCPropertyFactory withUseHeaderAsParameters(boolean value) {
+        return setProperty("camel.sink.endpoint.useHeadersAsParameters", value);
+    }
+
+    public static CamelJDBCPropertyFactory basic() {
+        return new CamelJDBCPropertyFactory()
+                .withName("CamelJDBCSinkConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.jdbc.CamelJdbcSinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
new file mode 100644
index 0000000..9830274
--- /dev/null
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.jdbc.sink;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
+import org.apache.camel.kafkaconnector.jdbc.services.JDBCService;
+import org.apache.camel.kafkaconnector.jdbc.services.JDBCServiceFactory;
+import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class CamelSinkJDBCITCase extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJDBCITCase.class);
+
+    @RegisterExtension
+    public JDBCService jdbcService = JDBCServiceFactory.createService();
+
+    private final int expect = 10;
+    private int received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-jdbc-kafka-connector"};
+    }
+
+    private void putRecords(CountDownLatch latch) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+
+        try {
+            for (int i = 0; i < expect; i++) {
+                Map<String, String> jdbcParameters = new HashMap<>();
+
+                // The prefix 'CamelHeader' is removed by the SinkTask
+                jdbcParameters.put("CamelHeaderTestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+                jdbcParameters.put("CamelHeaderTestData", "test data " + i);
+
+                try {
+                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters);
+                } catch (ExecutionException e) {
+                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    private void verifyData(ResultSet rs) {
+        try {
+            received++;
+            String testName = rs.getString("test_name");
+            String testData = rs.getString("test_data");
+
+            assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName));
+            assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData));
+
+        } catch (SQLException e) {
+            LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e);
+            fail(String.format("Unable to fetch record from result set: %s", e.getMessage()));
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
+        propertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService service = Executors.newCachedThreadPool();
+        service.submit(() -> putRecords(latch));
+
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            fail("Timed out wait for data to be added to the Kafka cluster");
+        }
+
+        LOG.debug("Waiting for indices");
+
+        try {
+            DatabaseClient client = jdbcService.getClient();
+
+            TestUtils.waitFor(() -> {
+                try {
+                    return client.hasAtLeastRecords("test", expect);
+                } catch (SQLException e) {
+                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+                    return false;
+                }
+            });
+
+            client.runQuery("select * from test", this::verifyData);
+        } catch (SQLException e) {
+            LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+
+        assertEquals(expect, received, "Did not receive the same amount of messages sent");
+        LOG.debug("Created the consumer ... About to receive messages");
+    }
+
+    @Test
+    public void testDBFetch() throws ExecutionException, InterruptedException {
+        CamelJDBCPropertyFactory factory = CamelJDBCPropertyFactory.basic()
+                .withDataSource(CamelJDBCPropertyFactory.classRef(TestDataSource.class.getName()))
+                .withDataSourceName("someName")
+                .withUseHeaderAsParameters(true)
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
+
+        runTest(factory);
+
+    }
+}
diff --git a/tests/itests-jdbc/src/test/resources/schema.sql b/tests/itests-jdbc/src/test/resources/schema.sql
new file mode 100644
index 0000000..f94398f
--- /dev/null
+++ b/tests/itests-jdbc/src/test/resources/schema.sql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE test (
+    test_name       VARCHAR(128) NOT NULL,
+    test_data       VARCHAR(128) NOT NULL
+);
\ No newline at end of file
diff --git a/tests/pom.xml b/tests/pom.xml
index f9ed354..90faefa 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -53,6 +53,7 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
+        <module>itests-jdbc</module>
     </modules>