You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:43 UTC

[camel-kafka-connector] 10/18: Convert the HTTP tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit be88da601a3f0e929bfb4415707faabb088707b3
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:34:44 2021 +0100

    Convert the HTTP tests to the new reusable sink test base class
---
 .../http/sink/CamelSinkHTTPITCase.java             | 106 ++++++++++-----------
 1 file changed, 48 insertions(+), 58 deletions(-)

diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
index 17aa85f..ea5d2db 100644
--- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
+++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
@@ -20,17 +20,15 @@ package org.apache.camel.kafkaconnector.http.sink;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.http.impl.bootstrap.HttpServer;
 import org.apache.http.impl.bootstrap.ServerBootstrap;
 import org.junit.jupiter.api.AfterEach;
@@ -45,13 +43,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkHTTPITCase extends AbstractKafkaTest {
+public class CamelSinkHTTPITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkHTTPITCase.class);
     private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
 
     private HttpServer localServer;
 
     private HTTPTestValidationHandler validationHandler;
+    private List<String> replies;
+    private String topicName;
 
     private final int expect = 10;
 
@@ -62,6 +62,8 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() throws IOException {
+        topicName = getTopicForTest(this);
+
         validationHandler = new HTTPTestValidationHandler(10);
         byte[] ipAddr = new byte[]{127, 0, 0, 1};
         InetAddress localhost = InetAddress.getByAddress(ipAddr);
@@ -83,76 +85,64 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-    private void putRecords() {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
-            } catch (ExecutionException e) {
-                e.printStackTrace();
-            } catch (InterruptedException e) {
-                break;
-            }
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            replies = validationHandler
+                    .getReplies()
+                    .get(30, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Unable to ret replies: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, TimeoutException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(this::putRecords);
-
-        LOG.debug("Created the consumer ... About to receive messages");
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            if (replies == null) {
+                fail("Some messages should have been exchanged, but none seems to have gone through");
+            }
 
-        List<String> replies = validationHandler.getReplies().get(30, TimeUnit.SECONDS);
-        if (replies == null) {
-            fail("Some messages should have been exchanged, but none seems to have gone through");
-        }
+            for (String reply : replies) {
+                LOG.debug("Received: {} ", reply);
+            }
 
-        for (String reply : replies) {
-            LOG.debug("Received: {} ", reply);
+            assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent");
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
-
-        assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent");
-
     }
 
+
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
+    public void testBasicSendReceive() throws Exception {
+        String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
 
-            ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHttpUri(url);
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
+                .withTopics(topicName)
+                .withHttpUri(url);
 
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
 
-            ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withUrl(hostName)
-                        .buildUrl();
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl(hostName)
+                    .buildUrl();
 
-
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }