You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:44 UTC
[camel-kafka-connector] 11/18: Convert the JDBC tests to the new
reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 7936a938d51ad4e466bba420a3d1d00d67ccee30
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 13:43:45 2021 +0100
Convert the JDBC tests to the new reusable sink test base class
---
.../jdbc/sink/CamelSinkJDBCITCase.java | 121 ++++++++++-----------
1 file changed, 58 insertions(+), 63 deletions(-)
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
index 87752a1..3663890 100644
--- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
@@ -22,22 +22,19 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource;
import org.apache.camel.test.infra.jdbc.services.JDBCService;
import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,11 +46,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkJDBCITCase extends AbstractKafkaTest {
+public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
@RegisterExtension
static JDBCService jdbcService;
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJDBCITCase.class);
+ private DatabaseClient client;
+ private String topicName;
private final int expect = 10;
private int received;
@@ -74,36 +73,69 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest {
.build();
}
+ @BeforeEach
+ public void setUp() throws SQLException {
+ topicName = getTopicForTest(this);
+ client = new DatabaseClient(jdbcService.jdbcUrl());
+ received = 0;
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-jdbc-kafka-connector"};
}
- private void putRecords(CountDownLatch latch) {
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+ @Override
+ protected String testMessageContent(int current) {
+ return "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+ }
- try {
- for (int i = 0; i < expect; i++) {
- Map<String, String> jdbcParameters = new HashMap<>();
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> jdbcParameters = new HashMap<>();
- // The prefix 'CamelHeader' is removed by the SinkTask
- jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
- jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
+ // The prefix 'CamelHeader' is removed by the SinkTask
+ jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+ jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
+ return jdbcParameters;
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ LOG.debug("Waiting for indices");
+
+ TestUtils.waitFor(() -> {
try {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters);
- } catch (ExecutionException e) {
- LOG.error("Unable to produce messages: {}", e.getMessage(), e);
- } catch (InterruptedException e) {
- break;
+ return client.hasAtLeastRecords("test", expect);
+ } catch (SQLException e) {
+ LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+ return false;
}
- }
+ });
+
} finally {
latch.countDown();
}
}
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(20, TimeUnit.SECONDS)) {
+ try {
+ client.runQuery("select * from test", this::verifyData);
+
+ assertEquals(expect, received, "Did not receive the same amount of messages sent");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ } else {
+ fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+ received, expect));
+ }
+ }
+
private void verifyData(ResultSet rs) {
try {
received++;
@@ -112,58 +144,21 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest {
assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName));
assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData));
-
} catch (SQLException e) {
LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e);
fail(String.format("Unable to fetch record from result set: %s", e.getMessage()));
}
}
- public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
- propertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService service = Executors.newCachedThreadPool();
- service.submit(() -> putRecords(latch));
-
- if (!latch.await(30, TimeUnit.SECONDS)) {
- fail("Timed out wait for data to be added to the Kafka cluster");
- }
-
- LOG.debug("Waiting for indices");
-
- try {
- DatabaseClient client = new DatabaseClient(jdbcService.jdbcUrl());
-
- TestUtils.waitFor(() -> {
- try {
- return client.hasAtLeastRecords("test", expect);
- } catch (SQLException e) {
- LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
- return false;
- }
- });
-
- client.runQuery("select * from test", this::verifyData);
- } catch (SQLException e) {
- LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
-
- assertEquals(expect, received, "Did not receive the same amount of messages sent");
- LOG.debug("Created the consumer ... About to receive messages");
- }
-
+ @Timeout(30)
@Test
- public void testDBFetch() throws ExecutionException, InterruptedException {
+ public void testDBFetch() throws Exception {
CamelJDBCPropertyFactory factory = CamelJDBCPropertyFactory.basic()
.withDataSource(CamelJDBCPropertyFactory.classRef(TestDataSource.class.getName()))
.withDataSourceName("someName")
.withUseHeaderAsParameters(true)
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
-
- runTest(factory);
+ .withTopics(topicName);
+ runTest(factory, topicName, expect);
}
}