You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/12 17:58:08 UTC

[camel-kafka-connector] 04/12: Fixed itest for netty-http.

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 7c383363c6ec1385149547cea6307587b6455f23
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat Mar 6 02:44:30 2021 +0100

    Fixed itest for netty-http.
---
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java   |   5 +-
 .../source/CamelNettyhttpPropertyFactory.java      |  63 ------------
 .../source/CamelSourceNettyHTTPITCase.java         |   2 +-
 .../source/CamelSourceNettyhttpITCase.java         | 109 ---------------------
 tests/pom.xml                                      |   1 -
 5 files changed, 4 insertions(+), 176 deletions(-)

diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
index cdf8b2c..96bd27a 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.nettyhttp.sink;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
@@ -94,7 +95,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
                 .withHost(mockServer.getHostName())
                 .withPort(mockServer.getPort())
                 .withPath("test");
-
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
         runTest(connectorPropertyFactory, topicName, expect);
     }
 
@@ -105,7 +106,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
                 .withTopics(topicName)
                 .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
                 .buildUrl();
-
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
         runTest(connectorPropertyFactory, topicName, expect);
     }
 }
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
deleted file mode 100644
index d97340f..0000000
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.nettyhttp.source;
-
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
-import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
-
-final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
-
-    private CamelNettyhttpPropertyFactory() {
-    }
-
-    public CamelNettyhttpPropertyFactory withProtocol(String value) {
-        return setProperty("camel.source.path.protocol", value);
-    }
-
-    public CamelNettyhttpPropertyFactory withHost(String value) {
-        return setProperty("camel.source.path.host", value);
-    }
-
-    public CamelNettyhttpPropertyFactory withPort(int value) {
-        return setProperty("camel.source.path.port", value);
-    }
-
-    public CamelNettyhttpPropertyFactory withPath(String value) {
-        return setProperty("camel.source.path.path", value);
-    }
-
-    public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
-        String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
-        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
-    }
-
-    public static CamelNettyhttpPropertyFactory basic() {
-        return new CamelNettyhttpPropertyFactory()
-                .withName("CamelNettyhttpSourceConnector")
-                .withTasksMax(1)
-                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
-                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
-                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
-                .withTransformsConfig("tostring")
-                .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value")
-                .withEntry("target.type", "java.lang.String")
-                .end()
-                .withSourceContentLogginglevel(LoggingLevel.DEBUG);
-    }
-}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 41cb6e1..48bcb59 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
-    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000);
     private static final String TEST_MESSAGE = "testMessage";
 
     private String topicName;
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
deleted file mode 100644
index e1c28de..0000000
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.nettyhttp.source;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
-import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
-import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
-
-@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969")
-public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport {
-    private final String host = NetworkUtils.getHostname();
-    private final int port = NetworkUtils.getFreePort();
-
-    private final int expect = 1;
-    private String topicName;
-
-    @Override
-    protected String[] getConnectorsInTest() {
-        return new String[] {"camel-netty-http-kafka-connector"};
-    }
-
-    @BeforeEach
-    public void setUp() {
-        topicName = getTopicForTest(this);
-    }
-
-    @Override
-    protected void produceTestData() {
-        TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
-        sendMessage();
-    }
-
-    void sendMessage() {
-        OkHttpClient client = new OkHttpClient();
-        RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!");
-        Request request = new Request.Builder()
-                .url("http://" + host + ":" + port + "/test")
-                .post(body)
-                .build();
-        try (Response response = client.newCall(request).execute()) {
-            assertEquals(200, response.code(), "Source endpoint didn't return 200");
-        } catch (IOException e) {
-            fail(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    protected void verifyMessages(TestMessageConsumer<?> consumer) {
-        int received = consumer.consumedMessages().size();
-        String receivedObject = (String) consumer.consumedMessages().get(0).value();
-        assertEquals(expect, received, "Did not receive as many messages as expected");
-        assertEquals("Hello CKC!", receivedObject, "Received message content differed");
-    }
-
-    @Test
-    @Timeout(30)
-    public void testLaunchConnector() throws ExecutionException, InterruptedException {
-        CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
-                .withKafkaTopic(topicName)
-                .withProtocol("http")
-                .withHost(host)
-                .withPort(port)
-                .withPath("test");
-
-        runTest(connectorPropertyFactory, topicName, expect);
-    }
-
-    @Test
-    @Timeout(30)
-    public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
-        CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
-                .withKafkaTopic(topicName)
-                .withUrl("http", host, port, "test")
-                .buildUrl();
-
-        runTest(connectorPropertyFactory, topicName, expect);
-    }
-}
diff --git a/tests/pom.xml b/tests/pom.xml
index c735d3b..e30a7aa 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -55,7 +55,6 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
-        <module>itests-netty-http</module>
         <module>itests-jdbc</module>
         <module>itests-azure-common</module>
         <module>itests-azure-storage-blob</module>