You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/12 17:58:08 UTC
[camel-kafka-connector] 04/12: Fixed itest for netty-http.
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 7c383363c6ec1385149547cea6307587b6455f23
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat Mar 6 02:44:30 2021 +0100
Fixed itest for netty-http.
---
.../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 5 +-
.../source/CamelNettyhttpPropertyFactory.java | 63 ------------
.../source/CamelSourceNettyHTTPITCase.java | 2 +-
.../source/CamelSourceNettyhttpITCase.java | 109 ---------------------
tests/pom.xml | 1 -
5 files changed, 4 insertions(+), 176 deletions(-)
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
index cdf8b2c..96bd27a 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.nettyhttp.sink;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
@@ -94,7 +95,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
.withHost(mockServer.getHostName())
.withPort(mockServer.getPort())
.withPath("test");
-
+ mockServer.enqueue(new MockResponse().setResponseCode(200));
runTest(connectorPropertyFactory, topicName, expect);
}
@@ -105,7 +106,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
.withTopics(topicName)
.withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
.buildUrl();
-
+ mockServer.enqueue(new MockResponse().setResponseCode(200));
runTest(connectorPropertyFactory, topicName, expect);
}
}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
deleted file mode 100644
index d97340f..0000000
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.nettyhttp.source;
-
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
-import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
-
-final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
-
- private CamelNettyhttpPropertyFactory() {
- }
-
- public CamelNettyhttpPropertyFactory withProtocol(String value) {
- return setProperty("camel.source.path.protocol", value);
- }
-
- public CamelNettyhttpPropertyFactory withHost(String value) {
- return setProperty("camel.source.path.host", value);
- }
-
- public CamelNettyhttpPropertyFactory withPort(int value) {
- return setProperty("camel.source.path.port", value);
- }
-
- public CamelNettyhttpPropertyFactory withPath(String value) {
- return setProperty("camel.source.path.path", value);
- }
-
- public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
- String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
- return new EndpointUrlBuilder<>(this::withSourceUrl, url);
- }
-
- public static CamelNettyhttpPropertyFactory basic() {
- return new CamelNettyhttpPropertyFactory()
- .withName("CamelNettyhttpSourceConnector")
- .withTasksMax(1)
- .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
- .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
- .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
- .withTransformsConfig("tostring")
- .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value")
- .withEntry("target.type", "java.lang.String")
- .end()
- .withSourceContentLogginglevel(LoggingLevel.DEBUG);
- }
-}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 41cb6e1..48bcb59 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
- private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+ private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000);
private static final String TEST_MESSAGE = "testMessage";
private String topicName;
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
deleted file mode 100644
index e1c28de..0000000
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.nettyhttp.source;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
-import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
-import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
-
-@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969")
-public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport {
- private final String host = NetworkUtils.getHostname();
- private final int port = NetworkUtils.getFreePort();
-
- private final int expect = 1;
- private String topicName;
-
- @Override
- protected String[] getConnectorsInTest() {
- return new String[] {"camel-netty-http-kafka-connector"};
- }
-
- @BeforeEach
- public void setUp() {
- topicName = getTopicForTest(this);
- }
-
- @Override
- protected void produceTestData() {
- TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
- sendMessage();
- }
-
- void sendMessage() {
- OkHttpClient client = new OkHttpClient();
- RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!");
- Request request = new Request.Builder()
- .url("http://" + host + ":" + port + "/test")
- .post(body)
- .build();
- try (Response response = client.newCall(request).execute()) {
- assertEquals(200, response.code(), "Source endpoint didn't return 200");
- } catch (IOException e) {
- fail(e.getMessage(), e);
- }
- }
-
- @Override
- protected void verifyMessages(TestMessageConsumer<?> consumer) {
- int received = consumer.consumedMessages().size();
- String receivedObject = (String) consumer.consumedMessages().get(0).value();
- assertEquals(expect, received, "Did not receive as many messages as expected");
- assertEquals("Hello CKC!", receivedObject, "Received message content differed");
- }
-
- @Test
- @Timeout(30)
- public void testLaunchConnector() throws ExecutionException, InterruptedException {
- CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
- .withKafkaTopic(topicName)
- .withProtocol("http")
- .withHost(host)
- .withPort(port)
- .withPath("test");
-
- runTest(connectorPropertyFactory, topicName, expect);
- }
-
- @Test
- @Timeout(30)
- public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
- CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
- .withKafkaTopic(topicName)
- .withUrl("http", host, port, "test")
- .buildUrl();
-
- runTest(connectorPropertyFactory, topicName, expect);
- }
-}
diff --git a/tests/pom.xml b/tests/pom.xml
index c735d3b..e30a7aa 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -55,7 +55,6 @@
<module>itests-salesforce</module>
<module>itests-hdfs</module>
<module>itests-mongodb</module>
- <module>itests-netty-http</module>
<module>itests-jdbc</module>
<module>itests-azure-common</module>
<module>itests-azure-storage-blob</module>