You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/12/01 12:03:52 UTC

[camel-kafka-connector] branch it-tests-sql created (now 286a6ba)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch it-tests-sql
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 286a6ba  Added SQL IT Test to Tests POM

This branch includes the following new commits:

     new 888c54c  Added Camel-SQL Kafka Connector integration tests
     new 46bc367  IT Test SQL: Fixed CS
     new 286a6ba  Added SQL IT Test to Tests POM

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/03: Added Camel-SQL Kafka Connector integration tests

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch it-tests-sql
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 888c54cbbbc70c432e3862122f0d4cd6b2556827
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Dec 1 11:29:47 2020 +0100

    Added Camel-SQL Kafka Connector integration tests
---
 tests/itests-sql/pom.xml                           |  60 +++++++++
 .../kafkaconnector/sql/client/DatabaseClient.java  |  70 ++++++++++
 .../sql/services/SQLLocalContainerService.java     |  61 +++++++++
 .../sql/services/SQLRemoteService.java             |  41 ++++++
 .../kafkaconnector/sql/services/SQLService.java    |  47 +++++++
 .../sql/services/SQLServiceFactory.java            |  44 ++++++
 .../sql/services/TestDataSource.java               |  41 ++++++
 .../sql/sink/CamelSinkSQLITCase.java               | 149 +++++++++++++++++++++
 .../sql/sink/CamelSqlPropertyFactory.java          |  44 ++++++
 .../sql/source/CamelSourceSQLITCase.java           |  93 +++++++++++++
 .../sql/source/CamelSqlPropertyFactory.java        |  44 ++++++
 tests/itests-sql/src/test/resources/schema.sql     |  24 ++++
 12 files changed, 718 insertions(+)

diff --git a/tests/itests-sql/pom.xml b/tests/itests-sql/pom.xml
new file mode 100644
index 0000000..dbcf4af
--- /dev/null
+++ b/tests/itests-sql/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-sql</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: SQL</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-sql</artifactId>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+</project>
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/client/DatabaseClient.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/client/DatabaseClient.java
new file mode 100644
index 0000000..f3e8c21
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/client/DatabaseClient.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.client;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.function.Consumer;
+
+import org.postgresql.ds.PGSimpleDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatabaseClient {
+    private static final Logger LOG = LoggerFactory.getLogger(DatabaseClient.class);
+    private PGSimpleDataSource datasource;
+    private final Connection connection;
+
+    public DatabaseClient(String url) throws SQLException {
+        LOG.info("Opening a new database connection using the URL {}", url);
+
+        datasource = new PGSimpleDataSource();
+        datasource.setURL(url);
+        datasource.setUser("ckc");
+        datasource.setPassword("ckcDevel123");
+        connection = datasource.getConnection();
+    }
+
+    public void runQuery(String query, Consumer<ResultSet> consumer) throws SQLException {
+        try (ResultSet rs = connection.prepareStatement(query).executeQuery()) {
+
+            while (rs.next()) {
+                consumer.accept(rs);
+            }
+        }
+    }
+
+    public int count(String table) throws SQLException {
+        String query = String.format("select count(*) as count from %s", table);
+
+        try (ResultSet rs = connection.prepareStatement(query).executeQuery()) {
+            while (rs.next()) {
+                return rs.getInt("count");
+            }
+        }
+
+        return 0;
+    }
+
+    public boolean hasAtLeastRecords(String table, int expected) throws SQLException {
+        int count = count(table);
+
+        return count >= expected;
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java
new file mode 100644
index 0000000..a907d32
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+public class SQLLocalContainerService implements SQLService {
+    private static final Logger LOG = LoggerFactory.getLogger(SQLLocalContainerService.class);
+
+    private static JdbcDatabaseContainer container;
+
+    public SQLLocalContainerService() {
+        container = new PostgreSQLContainer()
+                .withDatabaseName("camel")
+                .withUsername("ckc")
+                .withPassword("ckcDevel123")
+                .withInitScript("schema.sql")
+                .withStartupTimeoutSeconds(60);
+
+        container.start();
+
+        System.setProperty("sql.url", container.getJdbcUrl());
+    }
+
+    @Override
+    public String sqlUrl() {
+        return container.getJdbcUrl();
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("Database instance available via JDBC url {}", container.getJdbcUrl());
+    }
+
+    @Override
+    public void shutdown() {
+    	System.err.println("Shutdown");
+        LOG.info("Stopping the database instance");
+        container.stop();
+    }
+    
+    
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLRemoteService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLRemoteService.java
new file mode 100644
index 0000000..47fd863
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLRemoteService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.services;
+
+public class SQLRemoteService implements SQLService {
+    private static final String CONNECTION_URL;
+
+    static {
+        CONNECTION_URL = System.getProperty("sql.connection.url");
+    }
+
+    @Override
+    public String sqlUrl() {
+        return CONNECTION_URL;
+    }
+
+    @Override
+    public void initialize() {
+        // NO-OP
+    }
+
+    @Override
+    public void shutdown() {
+        // NO-OP
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java
new file mode 100644
index 0000000..2e389a1
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface SQLService extends BeforeAllCallback, AfterAllCallback {
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+    String sqlUrl();
+
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java
new file mode 100644
index 0000000..e4268f0
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SQLServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(SQLServiceFactory.class);
+
+    private SQLServiceFactory() {
+    }
+
+
+	public static SQLService createService() {
+        String instanceType = System.getProperty("sql.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-sql-container")) {
+           return new SQLLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new SQLRemoteService();
+        }
+
+        LOG.error("SQL instance must be one of 'local-sql-container' or 'remote");
+        throw new UnsupportedOperationException("Invalid SQL instance type: " + instanceType);
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/TestDataSource.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/TestDataSource.java
new file mode 100644
index 0000000..13eee72
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/TestDataSource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.services;
+
+import org.postgresql.ds.PGSimpleDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestDataSource extends PGSimpleDataSource {
+    private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class);
+
+    private static final String URL;
+
+    static {
+        URL = System.getProperty("sql.url");
+    }
+
+    public TestDataSource() {
+        super();
+        setUrl(URL);
+
+        setUser("ckc");
+        setPassword("ckcDevel123");
+
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
new file mode 100644
index 0000000..b61fa96
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.sink;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
+import org.apache.camel.kafkaconnector.sql.services.SQLService;
+import org.apache.camel.kafkaconnector.sql.services.SQLServiceFactory;
+import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class CamelSinkSQLITCase extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSQLITCase.class);
+
+    @RegisterExtension
+    public SQLService sqlService = SQLServiceFactory.createService();
+
+    private final int expect = 1;
+    private int received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-sql-kafka-connector"};
+    }
+
+    private void putRecords(CountDownLatch latch) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        try {
+            for (int i = 0; i < expect; i++) {
+                Map<String, String> sqlParameters = new HashMap<>();
+
+                // The prefix 'CamelHeader' is removed by the SinkTask
+                sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+                sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
+
+                try {
+                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test", sqlParameters);
+                } catch (ExecutionException e) {
+                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    private void verifyData(ResultSet rs) {
+        try {
+            received++;
+            String testName = rs.getString("test_name");
+            String testData = rs.getString("test_data");
+
+            assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName));
+            assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData));
+
+        } catch (SQLException e) {
+            LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e);
+            fail(String.format("Unable to fetch record from result set: %s", e.getMessage()));
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
+        propertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService service = Executors.newCachedThreadPool();
+        service.submit(() -> putRecords(latch));
+
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            fail("Timed out wait for data to be added to the Kafka cluster");
+        }
+
+        LOG.debug("Waiting for indices");
+
+        try {
+            DatabaseClient client = new DatabaseClient(sqlService.sqlUrl());
+
+            TestUtils.waitFor(() -> {
+                try {
+                    return client.hasAtLeastRecords("test", expect);
+                } catch (SQLException e) {
+                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+                    return false;
+                }
+            });
+
+            client.runQuery("select * from test", this::verifyData);
+        } catch (SQLException e) {
+            LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+
+        assertEquals(expect, received, "Did not receive the same amount of messages sent");
+        LOG.debug("Created the consumer ... About to receive messages");
+    }
+
+    @Test
+    public void testDBFetch() throws ExecutionException, InterruptedException {
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic()
+                .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+                .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)")
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
+
+        runTest(factory);
+
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java
new file mode 100644
index 0000000..f7df3fc
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.sink;
+
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<CamelSqlPropertyFactory> {
+    private CamelSqlPropertyFactory() {
+
+    }
+
+    public CamelSqlPropertyFactory withDataSource(String value) {
+        return setProperty("camel.component.sql.dataSource", value);
+    }
+
+    public CamelSqlPropertyFactory withQuery(String value) {
+        return setProperty("camel.sink.path.query", value);
+    }
+
+    public static CamelSqlPropertyFactory basic() {
+        return new CamelSqlPropertyFactory()
+                .withName("CamelSQLSinkConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
new file mode 100644
index 0000000..8b877bf
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.source;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
+import org.apache.camel.kafkaconnector.sql.services.SQLService;
+import org.apache.camel.kafkaconnector.sql.services.SQLServiceFactory;
+import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.ExecutionException;
+
+@Testcontainers
+public class CamelSourceSQLITCase extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSQLITCase.class);
+
+    @RegisterExtension
+    public SQLService sqlService = SQLServiceFactory.createService();
+
+    private final int expect = 1;
+    private int received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-sql-kafka-connector"};
+    }
+
+    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+
+        LOG.debug("Received: {}", record.value());
+        received++;
+
+        return false;
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
+    }
+
+    @Timeout(10)
+    @Test
+    public void testDBFetch() throws ExecutionException, InterruptedException {
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic()
+                .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+                .withQuery("select * from test")
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
+
+        runTest(factory);
+
+    }
+}
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java
new file mode 100644
index 0000000..5618cf4
--- /dev/null
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sql.source;
+
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<CamelSqlPropertyFactory> {
+    private CamelSqlPropertyFactory() {
+
+    }
+
+    public CamelSqlPropertyFactory withDataSource(String value) {
+        return setProperty("camel.component.sql.dataSource", value);
+    }
+
+    public CamelSqlPropertyFactory withQuery(String value) {
+        return setProperty("camel.source.path.query", value);
+    }
+
+    public static CamelSqlPropertyFactory basic() {
+        return new CamelSqlPropertyFactory()
+                .withName("CamelSQLSourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-sql/src/test/resources/schema.sql b/tests/itests-sql/src/test/resources/schema.sql
new file mode 100644
index 0000000..832f6cd
--- /dev/null
+++ b/tests/itests-sql/src/test/resources/schema.sql
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE test (
+    test_name       VARCHAR(128) NOT NULL,
+    test_data       VARCHAR(128) NOT NULL
+);
+
+insert into test(test_name, test_data) values('SomeName','test data');


[camel-kafka-connector] 02/03: IT Test SQL: Fixed CS

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch it-tests-sql
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 46bc36761641a647011e588bc0c9f41279bb5454
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Dec 1 13:01:38 2020 +0100

    IT Test SQL: Fixed CS
---
 .../sql/services/SQLLocalContainerService.java          | 12 +++---------
 .../camel/kafkaconnector/sql/services/SQLService.java   |  1 -
 .../kafkaconnector/sql/services/SQLServiceFactory.java  |  5 ++---
 .../kafkaconnector/sql/sink/CamelSinkSQLITCase.java     |  6 ++----
 .../sql/sink/CamelSqlPropertyFactory.java               |  9 ++-------
 .../kafkaconnector/sql/source/CamelSourceSQLITCase.java | 17 ++++-------------
 .../sql/source/CamelSqlPropertyFactory.java             |  9 ++-------
 7 files changed, 15 insertions(+), 44 deletions(-)

diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java
index a907d32..e94805d 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java
@@ -28,12 +28,7 @@ public class SQLLocalContainerService implements SQLService {
     private static JdbcDatabaseContainer container;
 
     public SQLLocalContainerService() {
-        container = new PostgreSQLContainer()
-                .withDatabaseName("camel")
-                .withUsername("ckc")
-                .withPassword("ckcDevel123")
-                .withInitScript("schema.sql")
-                .withStartupTimeoutSeconds(60);
+        container = new PostgreSQLContainer().withDatabaseName("camel").withUsername("ckc").withPassword("ckcDevel123").withInitScript("schema.sql").withStartupTimeoutSeconds(60);
 
         container.start();
 
@@ -52,10 +47,9 @@ public class SQLLocalContainerService implements SQLService {
 
     @Override
     public void shutdown() {
-    	System.err.println("Shutdown");
+        System.err.println("Shutdown");
         LOG.info("Stopping the database instance");
         container.stop();
     }
-    
-    
+
 }
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java
index 2e389a1..44d48fb 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java
@@ -34,7 +34,6 @@ public interface SQLService extends BeforeAllCallback, AfterAllCallback {
 
     String sqlUrl();
 
-
     @Override
     default void beforeAll(ExtensionContext extensionContext) throws Exception {
         initialize();
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java
index e4268f0..90bd411 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java
@@ -26,12 +26,11 @@ public final class SQLServiceFactory {
     private SQLServiceFactory() {
     }
 
-
-	public static SQLService createService() {
+    public static SQLService createService() {
         String instanceType = System.getProperty("sql.instance.type");
 
         if (instanceType == null || instanceType.equals("local-sql-container")) {
-           return new SQLLocalContainerService();
+            return new SQLLocalContainerService();
         }
 
         if (instanceType.equals("remote")) {
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
index b61fa96..9cd1ff3 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
@@ -138,10 +138,8 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest {
 
     @Test
     public void testDBFetch() throws ExecutionException, InterruptedException {
-        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic()
-                .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
-                .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)")
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+            .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)").withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
 
         runTest(factory);
 
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java
index f7df3fc..3a64a5a 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.sql.sink;
 
-
 import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
 
 public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<CamelSqlPropertyFactory> {
@@ -34,11 +33,7 @@ public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<
     }
 
     public static CamelSqlPropertyFactory basic() {
-        return new CamelSqlPropertyFactory()
-                .withName("CamelSQLSinkConnector")
-                .withTasksMax(1)
-                .withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSinkConnector")
-                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
-                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+        return new CamelSqlPropertyFactory().withName("CamelSQLSinkConnector").withTasksMax(1).withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSinkConnector")
+            .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter").withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
     }
 }
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
index 8b877bf..4dc4b16 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
@@ -17,17 +17,16 @@
 
 package org.apache.camel.kafkaconnector.sql.source;
 
+import java.util.concurrent.ExecutionException;
+
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
 import org.apache.camel.kafkaconnector.sql.services.SQLService;
 import org.apache.camel.kafkaconnector.sql.services.SQLServiceFactory;
 import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -36,12 +35,6 @@ import org.slf4j.LoggerFactory;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.concurrent.ExecutionException;
 
 @Testcontainers
 public class CamelSourceSQLITCase extends AbstractKafkaTest {
@@ -82,10 +75,8 @@ public class CamelSourceSQLITCase extends AbstractKafkaTest {
     @Timeout(10)
     @Test
     public void testDBFetch() throws ExecutionException, InterruptedException {
-        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic()
-                .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
-                .withQuery("select * from test")
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+            .withQuery("select * from test").withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
 
         runTest(factory);
 
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java
index 5618cf4..4f5100a 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.sql.source;
 
-
 import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
 
 public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<CamelSqlPropertyFactory> {
@@ -34,11 +33,7 @@ public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<
     }
 
     public static CamelSqlPropertyFactory basic() {
-        return new CamelSqlPropertyFactory()
-                .withName("CamelSQLSourceConnector")
-                .withTasksMax(1)
-                .withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSourceConnector")
-                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
-                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+        return new CamelSqlPropertyFactory().withName("CamelSQLSourceConnector").withTasksMax(1).withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSourceConnector")
+            .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter").withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
     }
 }


[camel-kafka-connector] 03/03: Added SQL IT Test to Tests POM

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch it-tests-sql
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 286a6ba32cdeef99c9420b80a8500f29192fe044
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Dec 1 13:03:22 2020 +0100

    Added SQL IT Test to Tests POM
---
 tests/pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tests/pom.xml b/tests/pom.xml
index f32950b..a5ef718 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -63,6 +63,7 @@
         <module>itests-rabbitmq</module>
         <module>itests-couchbase</module>
         <module>itests-ssh</module>
+        <module>itests-sql</module>
     </modules>