You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:44 UTC

[camel-kafka-connector] 11/18: Convert the JDBC tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 7936a938d51ad4e466bba420a3d1d00d67ccee30
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 13:43:45 2021 +0100

    Convert the JDBC tests to the new reusable sink test base class
---
 .../jdbc/sink/CamelSinkJDBCITCase.java             | 121 ++++++++++-----------
 1 file changed, 58 insertions(+), 63 deletions(-)

diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
index 87752a1..3663890 100644
--- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
@@ -22,22 +22,19 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
 import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource;
 import org.apache.camel.test.infra.jdbc.services.JDBCService;
 import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +46,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkJDBCITCase extends AbstractKafkaTest {
+public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
     @RegisterExtension
     static JDBCService jdbcService;
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJDBCITCase.class);
+    private DatabaseClient client;
+    private String topicName;
 
     private final int expect = 10;
     private int received;
@@ -74,36 +73,69 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest {
                 .build();
     }
 
+    @BeforeEach
+    public void setUp() throws SQLException {
+        topicName = getTopicForTest(this);
+        client = new DatabaseClient(jdbcService.jdbcUrl());
+        received = 0;
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-jdbc-kafka-connector"};
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+    @Override
+    protected String testMessageContent(int current) {
+        return "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+    }
 
-        try {
-            for (int i = 0; i < expect; i++) {
-                Map<String, String> jdbcParameters = new HashMap<>();
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> jdbcParameters = new HashMap<>();
 
-                // The prefix 'CamelHeader' is removed by the SinkTask
-                jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
-                jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
+        // The prefix 'CamelHeader' is removed by the SinkTask
+        jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+        jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
 
+        return jdbcParameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            LOG.debug("Waiting for indices");
+
+            TestUtils.waitFor(() -> {
                 try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
+                    return client.hasAtLeastRecords("test", expect);
+                } catch (SQLException e) {
+                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+                    return false;
                 }
-            }
+            });
+
         } finally {
             latch.countDown();
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(20, TimeUnit.SECONDS)) {
+            try {
+                client.runQuery("select * from test", this::verifyData);
+
+                assertEquals(expect, received, "Did not receive the same amount of messages sent");
+            } catch (SQLException e) {
+                fail(e.getMessage());
+            }
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
     private void verifyData(ResultSet rs) {
         try {
             received++;
@@ -112,58 +144,21 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest {
 
             assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName));
             assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData));
-
         } catch (SQLException e) {
             LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e);
             fail(String.format("Unable to fetch record from result set: %s", e.getMessage()));
         }
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        LOG.debug("Waiting for indices");
-
-        try {
-            DatabaseClient client = new DatabaseClient(jdbcService.jdbcUrl());
-
-            TestUtils.waitFor(() -> {
-                try {
-                    return client.hasAtLeastRecords("test", expect);
-                } catch (SQLException e) {
-                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
-                    return false;
-                }
-            });
-
-            client.runQuery("select * from test", this::verifyData);
-        } catch (SQLException e) {
-            LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-
-        assertEquals(expect, received, "Did not receive the same amount of messages sent");
-        LOG.debug("Created the consumer ... About to receive messages");
-    }
-
+    @Timeout(30)
     @Test
-    public void testDBFetch() throws ExecutionException, InterruptedException {
+    public void testDBFetch() throws Exception {
         CamelJDBCPropertyFactory factory = CamelJDBCPropertyFactory.basic()
                 .withDataSource(CamelJDBCPropertyFactory.classRef(TestDataSource.class.getName()))
                 .withDataSourceName("someName")
                 .withUseHeaderAsParameters(true)
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
-
-        runTest(factory);
+                .withTopics(topicName);
 
+        runTest(factory, topicName, expect);
     }
 }