You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:59 UTC

[camel-kafka-connector] 16/22: Convert the SQL tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit fd4565bf9658b19aa815dd0e8c30fe08d5b7cf8e
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:38:01 2021 +0100

    Convert the SQL tests to the new reusable sink test base class
---
 .../sql/sink/CamelSinkSQLITCase.java               | 118 ++++++++++-----------
 1 file changed, 55 insertions(+), 63 deletions(-)

diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
index 53fcca7..79bf8f9 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
@@ -22,20 +22,16 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
 import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
 import org.apache.camel.test.infra.jdbc.services.JDBCService;
 import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -47,12 +43,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSinkSQLITCase extends AbstractKafkaTest {
+public class CamelSinkSQLITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSQLITCase.class);
 
     @RegisterExtension
     public JDBCService sqlService;
+    private DatabaseClient client;
 
+    private String topicName;
     private final int expect = 1;
     private int received;
 
@@ -76,30 +74,58 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest {
         return new String[] {"camel-sql-kafka-connector"};
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @BeforeEach
+    public void setUp() throws SQLException {
+        topicName = getTopicForTest(this);
+        client = new DatabaseClient(sqlService.jdbcUrl());
+    }
 
-        try {
-            for (int i = 0; i < expect; i++) {
-                Map<String, String> sqlParameters = new HashMap<>();
+    @Override
+    protected String testMessageContent(int current) {
+        return "test";
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> sqlParameters = new HashMap<>();
 
-                // The prefix 'CamelHeader' is removed by the SinkTask
-                sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
-                sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
+        // The prefix 'CamelHeader' is removed by the SinkTask
+        sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+        sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
 
+        return sqlParameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            TestUtils.waitFor(() -> {
                 try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test", sqlParameters);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
+                    return client.hasAtLeastRecords("test", expect);
+                } catch (SQLException e) {
+                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+                    return false;
                 }
-            }
+            });
         } finally {
             latch.countDown();
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(25, TimeUnit.SECONDS)) {
+            try {
+                client.runQuery("select * from test", this::verifyData);
+                assertEquals(expect, received, "Did not receive as much data as expected");
+            } catch (SQLException e) {
+                fail(e.getMessage());
+            }
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private void verifyData(ResultSet rs) {
         try {
             received++;
@@ -115,48 +141,14 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        LOG.debug("Waiting for indices");
-
-        try {
-            DatabaseClient client = new DatabaseClient(sqlService.jdbcUrl());
-
-            TestUtils.waitFor(() -> {
-                try {
-                    return client.hasAtLeastRecords("test", expect);
-                } catch (SQLException e) {
-                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
-                    return false;
-                }
-            });
-
-            client.runQuery("select * from test", this::verifyData);
-        } catch (SQLException e) {
-            LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-
-        assertEquals(expect, received, "Did not receive the same amount of messages sent");
-        LOG.debug("Created the consumer ... About to receive messages");
-    }
-
     @Test
-    public void testDBFetch() throws ExecutionException, InterruptedException {
-        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
-            .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)").withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
-
-        runTest(factory);
-
+    public void testDBFetch() throws Exception {
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory
+                .basic()
+                .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+                .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)")
+                .withTopics(topicName);
+
+        runTest(factory, topicName, expect);
     }
 }