You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/12 17:58:14 UTC

[camel-kafka-connector] 10/12: Fixed backported netty-http tests

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b264559126650aa4c7fe748d1c418a8769f818a7
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 12 17:03:25 2021 +0100

    Fixed backported netty-http tests
---
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java   | 59 ++++++++++++-------
 .../source/CamelSourceNettyHTTPITCase.java         | 68 ++++++++++++++--------
 2 files changed, 84 insertions(+), 43 deletions(-)

diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
index 96bd27a..db1e27c 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -17,14 +17,18 @@
 
 package org.apache.camel.kafkaconnector.nettyhttp.sink;
 
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -35,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
+public class CamelSinkNettyhttpITCase extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class);
 
     private MockWebServer mockServer;
@@ -52,7 +56,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
 
     @BeforeEach
     public void setUp() {
-        topicName = getTopicForTest(this);
+        topicName = TestUtils.getDefaultTestTopic(this.getClass());
         mockServer = new MockWebServer();
         received = null;
     }
@@ -64,28 +68,43 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
         }
     }
 
-    @Override
-    protected void consumeMessages(CountDownLatch latch) {
+    protected void verifyMessages() throws InterruptedException {
+        String expected = "test 0";
         try {
-            received = mockServer.takeRequest();
+            received = mockServer.takeRequest(30, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            LOG.error("Unable to receive messages: {}", e.getMessage(), e);
-        } finally {
-            latch.countDown();
+            LOG.error("Unable to receive http requests: {}", e.getMessage(), e);
+            fail("Failed to receive the messages within the specified time");
         }
+        assertEquals("/test", received.getPath(), "Received path differed");
+        assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
     }
 
-    @Override
-    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
-        String expected = "Sink test message 0";
-        if (latch.await(30, TimeUnit.SECONDS)) {
-            assertEquals("/test", received.getPath(), "Received path differed");
-            assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
-        } else {
-            fail("Failed to receive the messages within the specified time");
+    private void putRecords() {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            try {
+                kafkaClient.produce(topicName, "test " + i);
+            } catch (ExecutionException e) {
+                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+            } catch (InterruptedException e) {
+                break;
+            }
         }
     }
 
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        ExecutorService service = Executors.newCachedThreadPool();
+        service.submit(() -> putRecords());
+
+        verifyMessages();
+    }
+
     @Test
     @Timeout(30)
     public void testBasicSendReceive() throws Exception {
@@ -96,7 +115,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
                 .withPort(mockServer.getPort())
                 .withPath("test");
         mockServer.enqueue(new MockResponse().setResponseCode(200));
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory);
     }
 
     @Test
@@ -107,6 +126,6 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
                 .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
                 .buildUrl();
         mockServer.enqueue(new MockResponse().setResponseCode(200));
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory);
     }
 }
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 0174eb1..fe5d884 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -18,16 +18,19 @@ package org.apache.camel.kafkaconnector.nettyhttp.source;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
 
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
-import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -39,14 +42,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
+public class CamelSourceNettyHTTPITCase extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
-    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000);
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 20000, 29000);
     private static final String TEST_MESSAGE = "testMessage";
 
     private String topicName;
 
     private final int expect = 1;
+    private int received;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -54,26 +58,28 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
     }
 
     @BeforeEach
-    public void setUp() throws IOException {
-        topicName = getTopicForTest(this);
+    public void setUp() {
+        topicName = TestUtils.getDefaultTestTopic(this.getClass());
     }
 
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceive() throws Exception {
+    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
 
-        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
-                .withKafkaTopic(topicName)
-                .withReceiveBufferSize(10)
-                .withHost("0.0.0.0")
-                .withPort(HTTP_PORT)
-                .withProtocol("http")
-                .withCamelTypeConverterTransformTo("java.lang.String");
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        LOG.debug("Sending http request");
+        produceTestData();
+        LOG.debug("Http request sent");
+
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        LOG.debug("Consuming messages ...");
+        kafkaClient.consume(topicName, this::checkRecord);
+        LOG.debug("Messages consumed.");
 
-        runTestBlocking(connectorPropertyFactory, topicName, expect);
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
-    @Override
     protected void produceTestData() {
         int retriesLeft = 10;
         boolean success = false;
@@ -110,10 +116,26 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
         }
     }
 
-    @Override
-    protected void verifyMessages(TestMessageConsumer<?> consumer) {
-        int received = consumer.consumedMessages().size();
-        assertEquals(expect, received, "Didn't process the expected amount of messages");
-        assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString());
+    protected <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+        LOG.debug("Received: {}", record.value());
+        received++;
+        assertEquals(TEST_MESSAGE, record.value().toString());
+
+        return false;
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
+                .withKafkaTopic(topicName)
+                .withReceiveBufferSize(10)
+                .withHost("0.0.0.0")
+                .withPort(HTTP_PORT)
+                .withProtocol("http")
+                .withCamelTypeConverterTransformTo("java.lang.String");
+
+        runTest(connectorPropertyFactory);
     }
 }