You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:43 UTC
[camel-kafka-connector] 10/18: Convert the HTTP tests to the new
reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit be88da601a3f0e929bfb4415707faabb088707b3
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:34:44 2021 +0100
Convert the HTTP tests to the new reusable sink test base class
---
.../http/sink/CamelSinkHTTPITCase.java | 106 ++++++++++-----------
1 file changed, 48 insertions(+), 58 deletions(-)
diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
index 17aa85f..ea5d2db 100644
--- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
+++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
@@ -20,17 +20,15 @@ package org.apache.camel.kafkaconnector.http.sink;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.http.impl.bootstrap.HttpServer;
import org.apache.http.impl.bootstrap.ServerBootstrap;
import org.junit.jupiter.api.AfterEach;
@@ -45,13 +43,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkHTTPITCase extends AbstractKafkaTest {
+public class CamelSinkHTTPITCase extends CamelSinkTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkHTTPITCase.class);
private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
private HttpServer localServer;
private HTTPTestValidationHandler validationHandler;
+ private List<String> replies;
+ private String topicName;
private final int expect = 10;
@@ -62,6 +62,8 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() throws IOException {
+ topicName = getTopicForTest(this);
+
validationHandler = new HTTPTestValidationHandler(10);
byte[] ipAddr = new byte[]{127, 0, 0, 1};
InetAddress localhost = InetAddress.getByAddress(ipAddr);
@@ -83,76 +85,64 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest {
}
}
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
- private void putRecords() {
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
- for (int i = 0; i < expect; i++) {
- try {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
- } catch (ExecutionException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- break;
- }
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ replies = validationHandler
+ .getReplies()
+ .get(30, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.error("Unable to ret replies: {}", e.getMessage(), e);
+ } finally {
+ latch.countDown();
}
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, TimeoutException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
- ExecutorService service = Executors.newCachedThreadPool();
- service.submit(this::putRecords);
-
- LOG.debug("Created the consumer ... About to receive messages");
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ if (replies == null) {
+ fail("Some messages should have been exchanged, but none seems to have gone through");
+ }
- List<String> replies = validationHandler.getReplies().get(30, TimeUnit.SECONDS);
- if (replies == null) {
- fail("Some messages should have been exchanged, but none seems to have gone through");
- }
+ for (String reply : replies) {
+ LOG.debug("Received: {} ", reply);
+ }
- for (String reply : replies) {
- LOG.debug("Received: {} ", reply);
+ assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent");
+ } else {
+ fail("Failed to receive the messages within the specified time");
}
-
- assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent");
-
}
+
@Test
@Timeout(90)
- public void testBasicSendReceive() {
- try {
- String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
+ public void testBasicSendReceive() throws Exception {
+ String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
- ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withHttpUri(url);
+ ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
+ .withTopics(topicName)
+ .withHttpUri(url);
- runTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("HTTP test failed: {} {}", e.getMessage(), e);
- fail(e.getMessage(), e);
- }
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@Timeout(90)
- public void testBasicSendReceiveUsingUrl() {
- try {
- String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
+ public void testBasicSendReceiveUsingUrl() throws Exception {
+ String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
- ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withUrl(hostName)
- .buildUrl();
+ ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
+ .withTopics(topicName)
+ .withUrl(hostName)
+ .buildUrl();
-
- runTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("HTTP test failed: {} {}", e.getMessage(), e);
- fail(e.getMessage(), e);
- }
+ runTest(connectorPropertyFactory, topicName, expect);
}
}