You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2021/03/11 05:39:30 UTC
[camel-kafka-connector] 01/02: Add https itests
This is an automated email from the ASF dual-hosted git repository.
tsato pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 4d52cb141bb3508a68fc54a2fbecf8f8c4d8b7d7
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Mar 11 13:01:11 2021 +0900
Add https itests
---
.../common/BasicConnectorPropertyFactory.java | 4 +
.../common/ConnectorPropertyFactory.java | 2 +-
.../common/SinkConnectorPropertyFactory.java | 7 +
tests/itests-https/pom.xml | 57 +++++++
.../https/sink/CamelHTTPSPropertyFactory.java | 68 ++++++++
.../https/sink/CamelSinkHTTPSITCase.java | 179 +++++++++++++++++++++
.../src/test/resources/client-truststore.jks | Bin 0 -> 844 bytes
.../src/test/resources/server-keystore.jks | Bin 0 -> 2134 bytes
tests/pom.xml | 1 +
9 files changed, 317 insertions(+), 1 deletion(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
index 0e98490..20c3558 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
@@ -100,6 +100,10 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp
return "#class:" + className;
}
+ public static String classRef(Class<?> clazz) {
+ return "#class:" + clazz.getName();
+ }
+
public T merge(Properties properties) {
Set<Map.Entry<Object, Object>> set = properties.entrySet();
connectorProps.putAll(set.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b)->b)));
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java
index 8ce678d..a89695a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java
@@ -41,6 +41,6 @@ public interface ConnectorPropertyFactory {
Logger log = LoggerFactory.getLogger(ConnectorPropertyFactory.class);
log.info("Using the following properties for the test: ");
- properties.entrySet().forEach(entry -> log.info("{}={}", entry.getKey(), entry.getValue()));
+ properties.forEach((key, value) -> log.info("{}={}", key, value));
}
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
index 356ee0d..d8c70f5 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
@@ -17,7 +17,10 @@
package org.apache.camel.kafkaconnector.common;
+import org.apache.camel.LoggingLevel;
+
import static org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF;
public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
@@ -28,4 +31,8 @@ public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorProper
public T withSinkUrl(String sinkUrl) {
return setProperty(CAMEL_SINK_URL_CONF, sinkUrl);
}
+
+ public T withSinkContentLogginglevel(LoggingLevel level) {
+ return setProperty(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, level.toString());
+ }
}
diff --git a/tests/itests-https/pom.xml b/tests/itests-https/pom.xml
new file mode 100644
index 0000000..c550e83
--- /dev/null
+++ b/tests/itests-https/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-https</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: HTTPS</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-http</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java
new file mode 100644
index 0000000..31993ca
--- /dev/null
+++ b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.https.sink;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.support.jsse.KeyStoreParameters;
+import org.apache.camel.support.jsse.SSLContextParameters;
+import org.apache.camel.support.jsse.TrustManagersParameters;
+
+final class CamelHTTPSPropertyFactory extends SinkConnectorPropertyFactory<CamelHTTPSPropertyFactory> {
+ private CamelHTTPSPropertyFactory() {
+ }
+
+ public CamelHTTPSPropertyFactory withHttpUri(String uri) {
+ return setProperty("camel.sink.path.httpUri", uri);
+ }
+
+ public CamelHTTPSPropertyFactory withSslContextParameters(String bean, String keyStore, String password) {
+ withBeans("ksp", classRef(KeyStoreParameters.class));
+ withBeans("ksp.resource", keyStore);
+ withBeans("ksp.password", password);
+
+ withBeans("tmp", classRef(TrustManagersParameters.class));
+ withBeans("tmp.keyStore", "#bean:ksp");
+
+ withBeans(bean, classRef(SSLContextParameters.class));
+ withBeans(bean + ".trustManagers", "#bean:tmp");
+
+ return setProperty("camel.sink.endpoint.sslContextParameters", "#bean:" + bean);
+ }
+
+ public CamelHTTPSPropertyFactory withX509HostnameVerifier(String bean, Class<?> verifierClass) {
+ withBeans(bean, classRef(verifierClass));
+ return setProperty("camel.sink.endpoint.x509HostnameVerifier", "#bean:" + bean);
+ }
+
+ public EndpointUrlBuilder<CamelHTTPSPropertyFactory> withUrl(String host, int port, String path) {
+ String url = String.format("https://%s:%s/%s", host, port, path);
+ return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+ }
+
+ public static CamelHTTPSPropertyFactory basic() {
+ return new CamelHTTPSPropertyFactory()
+ .withName("CamelHttpsSinkConnector")
+ .withConnectorClass("org.apache.camel.kafkaconnector.https.CamelHttpsSinkConnector")
+ .withTasksMax(1)
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withSinkContentLogginglevel(LoggingLevel.DEBUG);
+ }
+}
diff --git a/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java
new file mode 100644
index 0000000..e5d6672
--- /dev/null
+++ b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.https.sink;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkHTTPSITCase extends CamelSinkTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkHTTPSITCase.class);
+
+ private final String host = NetworkUtils.getHostname();
+ private final int port = NetworkUtils.getFreePort(host);
+
+ private MockWebServer mockServer;
+
+ private String topicName;
+
+ private final int expect = 10;
+ private List<RecordedRequest> received;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-https-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ topicName = getTopicForTest(this);
+
+ setupHttpsMockServer();
+ received = Collections.emptyList();
+ }
+
+ private void setupHttpsMockServer() throws Exception {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(getClass().getResourceAsStream("/server-keystore.jks"), "secret".toCharArray());
+ KeyManagerFactory kmFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmFactory.init(keyStore, "secret".toCharArray());
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(kmFactory.getKeyManagers(), null, null);
+ mockServer = new MockWebServer();
+ mockServer.useHttps(sslContext.getSocketFactory(), false);
+ }
+
+ private void startMockServer() throws IOException {
+ IntStream.range(0, expect).forEach(i -> {
+ mockServer.enqueue(new MockResponse().setResponseCode(200));
+ });
+ mockServer.start(InetAddress.getByName(host), port);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (mockServer != null) {
+ mockServer.shutdown();
+ }
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ received = IntStream.range(0, expect).mapToObj(i -> {
+ try {
+ return mockServer.takeRequest(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+ return null;
+ }
+ }).collect(Collectors.toList());
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ String expected = "Sink test message ";
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ assertEquals(expect, received.size(), "Did not receive the same amount of messages that were sent");
+
+
+ for (RecordedRequest request : received) {
+ String actual = request.getBody().readUtf8();
+ LOG.debug("Received: {} ", actual);
+
+ assertEquals("/ckc", request.getPath(), "Received path differed");
+ assertTrue(actual.startsWith(expected), "Received message content differed");
+ }
+
+ assertEquals(expect, received.size(), "Did not receive the same amount of messages that were sent");
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testBasicSendReceive() throws Exception {
+ startMockServer();
+
+ String uri = mockServer.getHostName() + ":" + mockServer.getPort() + "/ckc";
+ ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPSPropertyFactory.basic()
+ .withTopics(topicName)
+ .withHttpUri(uri)
+ .withSslContextParameters("scp", toPath("client-truststore.jks"), "secret")
+ // let's skip host verification as hostname may vary depending on test env
+ .withX509HostnameVerifier("x509HostnameVerifier", NoopHostnameVerifier.class);
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testBasicSendReceiveUsingUrl() throws Exception {
+ startMockServer();
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPSPropertyFactory.basic()
+ .withTopics(topicName)
+ .withSslContextParameters("scp", toPath("client-truststore.jks"), "secret")
+ // let's skip host verification as hostname may vary depending on test env
+ .withX509HostnameVerifier("x509HostnameVerifier", NoopHostnameVerifier.class)
+ .withUrl(mockServer.getHostName(), mockServer.getPort(), "ckc")
+ .append("sslContextParameters", "#bean:scp")
+ .append("x509HostnameVerifier", "#bean:x509HostnameVerifier")
+ .buildUrl();
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+
+ private String toPath(String resource) {
+ URL url = Objects.requireNonNull(getClass().getClassLoader().getResource(resource));
+ return url.getPath();
+ }
+}
diff --git a/tests/itests-https/src/test/resources/client-truststore.jks b/tests/itests-https/src/test/resources/client-truststore.jks
new file mode 100644
index 0000000..d4c5d58
Binary files /dev/null and b/tests/itests-https/src/test/resources/client-truststore.jks differ
diff --git a/tests/itests-https/src/test/resources/server-keystore.jks b/tests/itests-https/src/test/resources/server-keystore.jks
new file mode 100644
index 0000000..a230624
Binary files /dev/null and b/tests/itests-https/src/test/resources/server-keystore.jks differ
diff --git a/tests/pom.xml b/tests/pom.xml
index 0c3ff80..cff0878 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -49,6 +49,7 @@
<module>itests-syslog</module>
<module>itests-file</module>
<module>itests-http</module>
+ <module>itests-https</module>
<module>itests-timer</module>
<module>itests-slack</module>
<module>itests-salesforce</module>