You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:50 UTC

[camel-kafka-connector] 07/22: Convert the ElasticSearch tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 34dec6e33b7b6324fb1b6d57a400e6f47cebbe50
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:07:53 2021 +0100

    Convert the ElasticSearch tests to the new reusable sink test base class
---
 .../sink/CamelSinkElasticSearchITCase.java         | 159 ++++++++-------------
 1 file changed, 60 insertions(+), 99 deletions(-)

diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
index 8358aac..c80f892 100644
--- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
@@ -17,16 +17,12 @@
 
 package org.apache.camel.kafkaconnector.elasticsearch.sink;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.elasticsearch.clients.ElasticSearchClient;
 import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon;
 import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchService;
@@ -41,26 +37,28 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
-public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
+public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);
 
     private ElasticSearchClient client;
+    private String topicName;
 
     private final int expect = 10;
     private int received;
     private final String transformKey = "index-test";
 
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-elasticsearch-rest-kafka-connector"};
@@ -68,32 +66,45 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         client = new ElasticSearchClient(elasticSearch.getElasticSearchHost(), elasticSearch.getPort(),
                 ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX);
 
         received = 0;
     }
 
-    private void putRecords(CountDownLatch latch) {
-        LOG.debug("Sending records to Kafka");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            for (int i = 0; i < expect; i++) {
-                try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
-            }
+            client.waitForIndex();
+
+            LOG.debug("Waiting for data");
+            client.waitForData(expect);
         } finally {
             latch.countDown();
         }
 
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            SearchHits hits = client.getData();
+            assertNotNull(hits);
+
+            hits.forEach(this::verifyHit);
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private void verifyHit(SearchHit searchHit) {
         String source = searchHit.getSourceAsString();
 
@@ -107,91 +118,41 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
         received++;
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        LOG.debug("Performing initialization");
-        getKafkaConnectService().initializeConnector(propertyFactory);
-
-        LOG.debug("Initialization complete");
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        LOG.debug("Waiting for records");
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        LOG.debug("Waiting for indices");
-
-        client.waitForIndex();
-
-        LOG.debug("Waiting for data");
-        client.waitForData(expect);
-
-        SearchHits hits = client.getData();
-
-        assertNotNull(hits);
-
-        hits.forEach(this::verifyHit);
-        assertEquals(expect, received, "Did not receive the same amount of messages sent");
-
-        LOG.debug("Created the consumer ... About to receive messages");
-    }
-
 
     @Test
     @Timeout(90)
-    public void testIndexOperation() {
-        try {
-            String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
-            ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
-                    .basic()
-                    .withTopics(topic)
-                    .withOperation("Index")
-                    .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
-                    .withHostAddress(elasticSearch.getHttpHostAddress())
-                    .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
-                    .withTransformsConfig("ElasticSearchTransforms")
-                        .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
-                        .withEntry("key", transformKey)
-                        .end();
-
-            runTest(propertyFactory);
-
-            LOG.debug("Created the consumer ... About to receive messages");
-        } catch (Exception e) {
-            LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testIndexOperation() throws Exception {
+        ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withOperation("Index")
+                .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
+                .withHostAddress(elasticSearch.getHttpHostAddress())
+                .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
+                .withTransformsConfig("ElasticSearchTransforms")
+                    .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
+                    .withEntry("key", transformKey)
+                    .end();
+
+        runTest(propertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testIndexOperationUsingUrl() {
-        try {
-            String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
-            ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
-                    .basic()
-                    .withTopics(topic)
-                    .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
-                        .append("hostAddresses", elasticSearch.getHttpHostAddress())
-                        .append("operation", "Index")
-                        .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
-                        .buildUrl()
-                    .withTransformsConfig("ElasticSearchTransforms")
-                        .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
-                        .withEntry("key", transformKey)
-                        .end();
-
-            runTest(propertyFactory);
-
-            LOG.debug("Created the consumer ... About to receive messages");
-        } catch (Exception e) {
-            LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testIndexOperationUsingUrl() throws Exception {
+        ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
+                    .append("hostAddresses", elasticSearch.getHttpHostAddress())
+                    .append("operation", "Index")
+                    .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
+                    .buildUrl()
+                .withTransformsConfig("ElasticSearchTransforms")
+                    .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
+                    .withEntry("key", transformKey)
+                    .end();
+
+        runTest(propertyFactory, topicName, expect);
     }
 }