You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:50 UTC
[camel-kafka-connector] 07/22: Convert the ElasticSearch tests to
the new reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 34dec6e33b7b6324fb1b6d57a400e6f47cebbe50
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:07:53 2021 +0100
Convert the ElasticSearch tests to the new reusable sink test base class
---
.../sink/CamelSinkElasticSearchITCase.java | 159 ++++++++-------------
1 file changed, 60 insertions(+), 99 deletions(-)
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
index 8358aac..c80f892 100644
--- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
@@ -17,16 +17,12 @@
package org.apache.camel.kafkaconnector.elasticsearch.sink;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.elasticsearch.clients.ElasticSearchClient;
import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon;
import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchService;
@@ -41,26 +37,28 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
disabledReason = "Hangs when running with the embedded Kafka Connect instance")
-public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
+public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
@RegisterExtension
public static ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService();
private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);
private ElasticSearchClient client;
+ private String topicName;
private final int expect = 10;
private int received;
private final String transformKey = "index-test";
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-elasticsearch-rest-kafka-connector"};
@@ -68,32 +66,45 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
+ topicName = getTopicForTest(this);
client = new ElasticSearchClient(elasticSearch.getElasticSearchHost(), elasticSearch.getPort(),
ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX);
received = 0;
}
- private void putRecords(CountDownLatch latch) {
- LOG.debug("Sending records to Kafka");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
try {
- for (int i = 0; i < expect; i++) {
- try {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
- } catch (ExecutionException e) {
- LOG.error("Unable to produce messages: {}", e.getMessage(), e);
- } catch (InterruptedException e) {
- break;
- }
- }
+ client.waitForIndex();
+
+ LOG.debug("Waiting for data");
+ client.waitForData(expect);
} finally {
latch.countDown();
}
}
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ SearchHits hits = client.getData();
+ assertNotNull(hits);
+
+ hits.forEach(this::verifyHit);
+ assertEquals(expect, received,
+ "Didn't process the expected amount of messages: " + received + " != " + expect);
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+
private void verifyHit(SearchHit searchHit) {
String source = searchHit.getSourceAsString();
@@ -107,91 +118,41 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
received++;
}
- public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
- propertyFactory.log();
- LOG.debug("Performing initialization");
- getKafkaConnectService().initializeConnector(propertyFactory);
-
- LOG.debug("Initialization complete");
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService service = Executors.newCachedThreadPool();
- service.submit(() -> putRecords(latch));
-
- LOG.debug("Waiting for records");
- if (!latch.await(30, TimeUnit.SECONDS)) {
- fail("Timed out wait for data to be added to the Kafka cluster");
- }
-
- LOG.debug("Waiting for indices");
-
- client.waitForIndex();
-
- LOG.debug("Waiting for data");
- client.waitForData(expect);
-
- SearchHits hits = client.getData();
-
- assertNotNull(hits);
-
- hits.forEach(this::verifyHit);
- assertEquals(expect, received, "Did not receive the same amount of messages sent");
-
- LOG.debug("Created the consumer ... About to receive messages");
- }
-
@Test
@Timeout(90)
- public void testIndexOperation() {
- try {
- String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
- ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
- .basic()
- .withTopics(topic)
- .withOperation("Index")
- .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
- .withHostAddress(elasticSearch.getHttpHostAddress())
- .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
- .withTransformsConfig("ElasticSearchTransforms")
- .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
- .withEntry("key", transformKey)
- .end();
-
- runTest(propertyFactory);
-
- LOG.debug("Created the consumer ... About to receive messages");
- } catch (Exception e) {
- LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testIndexOperation() throws Exception {
+ ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withOperation("Index")
+ .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
+ .withHostAddress(elasticSearch.getHttpHostAddress())
+ .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
+ .withTransformsConfig("ElasticSearchTransforms")
+ .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
+ .withEntry("key", transformKey)
+ .end();
+
+ runTest(propertyFactory, topicName, expect);
}
@Test
@Timeout(90)
- public void testIndexOperationUsingUrl() {
- try {
- String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
- ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
- .basic()
- .withTopics(topic)
- .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
- .append("hostAddresses", elasticSearch.getHttpHostAddress())
- .append("operation", "Index")
- .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
- .buildUrl()
- .withTransformsConfig("ElasticSearchTransforms")
- .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
- .withEntry("key", transformKey)
- .end();
-
- runTest(propertyFactory);
-
- LOG.debug("Created the consumer ... About to receive messages");
- } catch (Exception e) {
- LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testIndexOperationUsingUrl() throws Exception {
+ ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
+ .append("hostAddresses", elasticSearch.getHttpHostAddress())
+ .append("operation", "Index")
+ .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
+ .buildUrl()
+ .withTransformsConfig("ElasticSearchTransforms")
+ .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
+ .withEntry("key", transformKey)
+ .end();
+
+ runTest(propertyFactory, topicName, expect);
}
}