You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/12 17:58:05 UTC
[camel-kafka-connector] 01/12: Add netty-http itests #1036
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit a9adebc6506f17137b1c3c00f1a792125015054d
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Mar 4 19:50:25 2021 +0900
Add netty-http itests #1036
netty-http source itests are disabled due to #969
---
.../common/SinkConnectorPropertyFactory.java | 12 +--
.../common/SourceConnectorPropertyFactory.java | 29 ++++--
tests/itests-netty-http/pom.xml | 56 +++++++++++
.../sink/CamelNettyhttpPropertyFactory.java | 65 ++++++++++++
.../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 111 +++++++++++++++++++++
.../source/CamelNettyhttpPropertyFactory.java | 63 ++++++++++++
.../source/CamelSourceNettyhttpITCase.java | 109 ++++++++++++++++++++
tests/itests-parent/pom.xml | 19 +++-
tests/pom.xml | 1 +
9 files changed, 446 insertions(+), 19 deletions(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
index 0684164..356ee0d 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
@@ -17,17 +17,15 @@
package org.apache.camel.kafkaconnector.common;
-public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
+import static org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF;
- public T withTopics(String topics) {
- getProperties().put("topics", topics);
+public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
- return (T) this;
+ public T withTopics(String topics) {
+ return setProperty("topics", topics);
}
public T withSinkUrl(String sinkUrl) {
- getProperties().put("camel.sink.url", sinkUrl);
-
- return (T) this;
+ return setProperty(CAMEL_SINK_URL_CONF, sinkUrl);
}
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
index 684459c..aa59552 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
@@ -17,25 +17,32 @@
package org.apache.camel.kafkaconnector.common;
-public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
+import org.apache.camel.LoggingLevel;
- public T withKafkaTopic(String topic) {
- getProperties().put("topics", topic);
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME;
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF;
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.TOPIC_CONF;
+
+public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
- return (T) this;
+ public T withKafkaTopic(String topic) {
+ return setProperty(TOPIC_CONF, topic);
}
public T withSourceUrl(String sourceUrl) {
- getProperties().put("camel.source.url", sourceUrl);
+ return setProperty(CAMEL_SOURCE_URL_CONF, sourceUrl);
+ }
- return (T) this;
+ public T withSourceContentLogginglevel(LoggingLevel level) {
+ return setProperty(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, level.toString());
}
public T withAggregate(String aggregate, int size, int timeout) {
- withBeans("aggregate", classRef(aggregate));
- getProperties().put("camel.aggregation.size", size);
- getProperties().put("camel.aggregation.timeout", timeout);
-
- return (T) this;
+ return withBeans(CAMEL_CONNECTOR_AGGREGATE_NAME, classRef(aggregate))
+ .setProperty(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, size)
+ .setProperty(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, timeout);
}
}
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
new file mode 100644
index 0000000..16238cf
--- /dev/null
+++ b/tests/itests-netty-http/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-netty-http</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: Netty HTTP</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-netty-http</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java
new file mode 100644
index 0000000..9754e7d
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelNettyhttpPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
+
+ private CamelNettyhttpPropertyFactory() {
+ }
+
+ public CamelNettyhttpPropertyFactory withProtocol(String value) {
+ return setProperty("camel.sink.path.protocol", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withHost(String value) {
+ return setProperty("camel.sink.path.host", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPort(int value) {
+ return setProperty("camel.sink.path.port", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPath(String value) {
+ return setProperty("camel.sink.path.path", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withDisconnect(boolean value) {
+ return setProperty("camel.sink.endpoint.disconnect", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withSync(boolean value) {
+ return setProperty("camel.sink.endpoint.sync", value);
+ }
+
+ public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
+ String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
+ return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+ }
+
+ public static CamelNettyhttpPropertyFactory basic() {
+ return new CamelNettyhttpPropertyFactory()
+ .withName("CamelNettyhttpSinkConnector")
+ .withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
new file mode 100644
index 0000000..cdf8b2c
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class);
+
+ private MockWebServer mockServer;
+
+ private String topicName;
+
+ private final int expect = 1;
+ private volatile RecordedRequest received;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-netty-http-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ topicName = getTopicForTest(this);
+ mockServer = new MockWebServer();
+ received = null;
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (mockServer != null) {
+ mockServer.shutdown();
+ }
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ received = mockServer.takeRequest();
+ } catch (InterruptedException e) {
+ LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ String expected = "Sink test message 0";
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ assertEquals("/test", received.getPath(), "Received path differed");
+ assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+
+ @Test
+ @Timeout(30)
+ public void testBasicSendReceive() throws Exception {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+ .withTopics(topicName)
+ .withProtocol("http")
+ .withHost(mockServer.getHostName())
+ .withPort(mockServer.getPort())
+ .withPath("test");
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testBasicSendReceiveUsingUrl() throws Exception {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+ .withTopics(topicName)
+ .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
+ .buildUrl();
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
new file mode 100644
index 0000000..d97340f
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
+
+ private CamelNettyhttpPropertyFactory() {
+ }
+
+ public CamelNettyhttpPropertyFactory withProtocol(String value) {
+ return setProperty("camel.source.path.protocol", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withHost(String value) {
+ return setProperty("camel.source.path.host", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPort(int value) {
+ return setProperty("camel.source.path.port", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPath(String value) {
+ return setProperty("camel.source.path.path", value);
+ }
+
+ public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
+ String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
+ return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+ }
+
+ public static CamelNettyhttpPropertyFactory basic() {
+ return new CamelNettyhttpPropertyFactory()
+ .withName("CamelNettyhttpSourceConnector")
+ .withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withTransformsConfig("tostring")
+ .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value")
+ .withEntry("target.type", "java.lang.String")
+ .end()
+ .withSourceContentLogginglevel(LoggingLevel.DEBUG);
+ }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
new file mode 100644
index 0000000..e1c28de
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969")
+public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport {
+ private final String host = NetworkUtils.getHostname();
+ private final int port = NetworkUtils.getFreePort();
+
+ private final int expect = 1;
+ private String topicName;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-netty-http-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ topicName = getTopicForTest(this);
+ }
+
+ @Override
+ protected void produceTestData() {
+ TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
+ sendMessage();
+ }
+
+ void sendMessage() {
+ OkHttpClient client = new OkHttpClient();
+ RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!");
+ Request request = new Request.Builder()
+ .url("http://" + host + ":" + port + "/test")
+ .post(body)
+ .build();
+ try (Response response = client.newCall(request).execute()) {
+ assertEquals(200, response.code(), "Source endpoint didn't return 200");
+ } catch (IOException e) {
+ fail(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
+ String receivedObject = (String) consumer.consumedMessages().get(0).value();
+ assertEquals(expect, received, "Did not receive as many messages as expected");
+ assertEquals("Hello CKC!", receivedObject, "Received message content differed");
+ }
+
+ @Test
+ @Timeout(30)
+ public void testLaunchConnector() throws ExecutionException, InterruptedException {
+ CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+ .withKafkaTopic(topicName)
+ .withProtocol("http")
+ .withHost(host)
+ .withPort(port)
+ .withPath("test");
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
+ CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+ .withKafkaTopic(topicName)
+ .withUrl("http", host, port, "test")
+ .buildUrl();
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+}
diff --git a/tests/itests-parent/pom.xml b/tests/itests-parent/pom.xml
index c47a0a5..ccc4695 100644
--- a/tests/itests-parent/pom.xml
+++ b/tests/itests-parent/pom.xml
@@ -188,6 +188,23 @@
</dependency>
</dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${squareup-okhttp-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${squareup-okhttp-version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<build>
<plugins>
<plugin>
@@ -220,4 +237,4 @@
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/tests/pom.xml b/tests/pom.xml
index 8f5e1d8..37f2cf0 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -64,6 +64,7 @@
<module>itests-couchbase</module>
<module>itests-ssh</module>
<module>itests-sql</module>
+ <module>itests-netty-http</module>
</modules>