You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:59 UTC
[camel-kafka-connector] 16/22: Convert the SQL tests to the new
reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit fd4565bf9658b19aa815dd0e8c30fe08d5b7cf8e
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:38:01 2021 +0100
Convert the SQL tests to the new reusable sink test base class
---
.../sql/sink/CamelSinkSQLITCase.java | 118 ++++++++++-----------
1 file changed, 55 insertions(+), 63 deletions(-)
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
index 53fcca7..79bf8f9 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
@@ -22,20 +22,16 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
import org.apache.camel.test.infra.jdbc.services.JDBCService;
import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
@@ -47,12 +43,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-public class CamelSinkSQLITCase extends AbstractKafkaTest {
+public class CamelSinkSQLITCase extends CamelSinkTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSQLITCase.class);
@RegisterExtension
public JDBCService sqlService;
+ private DatabaseClient client;
+ private String topicName;
private final int expect = 1;
private int received;
@@ -76,30 +74,58 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest {
return new String[] {"camel-sql-kafka-connector"};
}
- private void putRecords(CountDownLatch latch) {
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ @BeforeEach
+ public void setUp() throws SQLException {
+ topicName = getTopicForTest(this);
+ client = new DatabaseClient(sqlService.jdbcUrl());
+ }
- try {
- for (int i = 0; i < expect; i++) {
- Map<String, String> sqlParameters = new HashMap<>();
+ @Override
+ protected String testMessageContent(int current) {
+ return "test";
+ }
+
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> sqlParameters = new HashMap<>();
- // The prefix 'CamelHeader' is removed by the SinkTask
- sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
- sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
+ // The prefix 'CamelHeader' is removed by the SinkTask
+ sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+ sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
+ return sqlParameters;
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ TestUtils.waitFor(() -> {
try {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test", sqlParameters);
- } catch (ExecutionException e) {
- LOG.error("Unable to produce messages: {}", e.getMessage(), e);
- } catch (InterruptedException e) {
- break;
+ return client.hasAtLeastRecords("test", expect);
+ } catch (SQLException e) {
+ LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+ return false;
}
- }
+ });
} finally {
latch.countDown();
}
}
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(25, TimeUnit.SECONDS)) {
+ try {
+ client.runQuery("select * from test", this::verifyData);
+ assertEquals(expect, received, "Did not receive as much data as expected");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+
private void verifyData(ResultSet rs) {
try {
received++;
@@ -115,48 +141,14 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest {
}
}
- public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
- propertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService service = Executors.newCachedThreadPool();
- service.submit(() -> putRecords(latch));
-
- if (!latch.await(30, TimeUnit.SECONDS)) {
- fail("Timed out wait for data to be added to the Kafka cluster");
- }
-
- LOG.debug("Waiting for indices");
-
- try {
- DatabaseClient client = new DatabaseClient(sqlService.jdbcUrl());
-
- TestUtils.waitFor(() -> {
- try {
- return client.hasAtLeastRecords("test", expect);
- } catch (SQLException e) {
- LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
- return false;
- }
- });
-
- client.runQuery("select * from test", this::verifyData);
- } catch (SQLException e) {
- LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
-
- assertEquals(expect, received, "Did not receive the same amount of messages sent");
- LOG.debug("Created the consumer ... About to receive messages");
- }
-
@Test
- public void testDBFetch() throws ExecutionException, InterruptedException {
- CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
- .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)").withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
-
- runTest(factory);
-
+ public void testDBFetch() throws Exception {
+ CamelSqlPropertyFactory factory = CamelSqlPropertyFactory
+ .basic()
+ .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+ .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)")
+ .withTopics(topicName);
+
+ runTest(factory, topicName, expect);
}
}