You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/12 17:58:14 UTC
[camel-kafka-connector] 10/12: Fixed backported netty-http tests
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b264559126650aa4c7fe748d1c418a8769f818a7
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 12 17:03:25 2021 +0100
Fixed backported netty-http tests
---
.../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 59 ++++++++++++-------
.../source/CamelSourceNettyHTTPITCase.java | 68 ++++++++++++++--------
2 files changed, 84 insertions(+), 43 deletions(-)
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
index 96bd27a..db1e27c 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -17,14 +17,18 @@
package org.apache.camel.kafkaconnector.nettyhttp.sink;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -35,7 +39,7 @@ import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
-public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
+public class CamelSinkNettyhttpITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class);
private MockWebServer mockServer;
@@ -52,7 +56,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
@BeforeEach
public void setUp() {
- topicName = getTopicForTest(this);
+ topicName = TestUtils.getDefaultTestTopic(this.getClass());
mockServer = new MockWebServer();
received = null;
}
@@ -64,28 +68,43 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
}
}
- @Override
- protected void consumeMessages(CountDownLatch latch) {
+ protected void verifyMessages() throws InterruptedException {
+ String expected = "test 0";
try {
- received = mockServer.takeRequest();
+ received = mockServer.takeRequest(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOG.error("Unable to receive messages: {}", e.getMessage(), e);
- } finally {
- latch.countDown();
+ LOG.error("Unable to receive http requests: {}", e.getMessage(), e);
+ fail("Failed to receive the messages within the specified time");
}
+ assertEquals("/test", received.getPath(), "Received path differed");
+ assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
}
- @Override
- protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
- String expected = "Sink test message 0";
- if (latch.await(30, TimeUnit.SECONDS)) {
- assertEquals("/test", received.getPath(), "Received path differed");
- assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
- } else {
- fail("Failed to receive the messages within the specified time");
+ private void putRecords() {
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ for (int i = 0; i < expect; i++) {
+ try {
+ kafkaClient.produce(topicName, "test " + i);
+ } catch (ExecutionException e) {
+ LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+ } catch (InterruptedException e) {
+ break;
+ }
}
}
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+ connectorPropertyFactory.log();
+
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+ ExecutorService service = Executors.newCachedThreadPool();
+ service.submit(() -> putRecords());
+
+ verifyMessages();
+ }
+
@Test
@Timeout(30)
public void testBasicSendReceive() throws Exception {
@@ -96,7 +115,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
.withPort(mockServer.getPort())
.withPath("test");
mockServer.enqueue(new MockResponse().setResponseCode(200));
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory);
}
@Test
@@ -107,6 +126,6 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
.withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
.buildUrl();
mockServer.enqueue(new MockResponse().setResponseCode(200));
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory);
}
}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 0174eb1..fe5d884 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -18,16 +18,19 @@ package org.apache.camel.kafkaconnector.nettyhttp.source;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
-import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -39,14 +42,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
+public class CamelSourceNettyHTTPITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
- private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000);
+ private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 20000, 29000);
private static final String TEST_MESSAGE = "testMessage";
private String topicName;
private final int expect = 1;
+ private int received;
@Override
protected String[] getConnectorsInTest() {
@@ -54,26 +58,28 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
}
@BeforeEach
- public void setUp() throws IOException {
- topicName = getTopicForTest(this);
+ public void setUp() {
+ topicName = TestUtils.getDefaultTestTopic(this.getClass());
}
- @Test
- @Timeout(90)
- public void testBasicSendReceive() throws Exception {
+ protected void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+ connectorPropertyFactory.log();
- ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
- .withKafkaTopic(topicName)
- .withReceiveBufferSize(10)
- .withHost("0.0.0.0")
- .withPort(HTTP_PORT)
- .withProtocol("http")
- .withCamelTypeConverterTransformTo("java.lang.String");
+ getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+ LOG.debug("Sending http request");
+ produceTestData();
+ LOG.debug("Http request sent");
+
+ LOG.debug("Creating the consumer ...");
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ LOG.debug("Consuming messages ...");
+ kafkaClient.consume(topicName, this::checkRecord);
+ LOG.debug("Messages consumed.");
- runTestBlocking(connectorPropertyFactory, topicName, expect);
+ assertEquals(received, expect, "Didn't process the expected amount of messages");
}
- @Override
protected void produceTestData() {
int retriesLeft = 10;
boolean success = false;
@@ -110,10 +116,26 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
}
}
- @Override
- protected void verifyMessages(TestMessageConsumer<?> consumer) {
- int received = consumer.consumedMessages().size();
- assertEquals(expect, received, "Didn't process the expected amount of messages");
- assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString());
+ protected <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+ LOG.debug("Received: {}", record.value());
+ received++;
+ assertEquals(TEST_MESSAGE, record.value().toString());
+
+ return false;
+ }
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() throws Exception {
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
+ .withKafkaTopic(topicName)
+ .withReceiveBufferSize(10)
+ .withHost("0.0.0.0")
+ .withPort(HTTP_PORT)
+ .withProtocol("http")
+ .withCamelTypeConverterTransformTo("java.lang.String");
+
+ runTest(connectorPropertyFactory);
}
}