You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2021/03/11 05:39:30 UTC

[camel-kafka-connector] 01/02: Add https itests

This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 4d52cb141bb3508a68fc54a2fbecf8f8c4d8b7d7
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Mar 11 13:01:11 2021 +0900

    Add https itests
---
 .../common/BasicConnectorPropertyFactory.java      |   4 +
 .../common/ConnectorPropertyFactory.java           |   2 +-
 .../common/SinkConnectorPropertyFactory.java       |   7 +
 tests/itests-https/pom.xml                         |  57 +++++++
 .../https/sink/CamelHTTPSPropertyFactory.java      |  68 ++++++++
 .../https/sink/CamelSinkHTTPSITCase.java           | 179 +++++++++++++++++++++
 .../src/test/resources/client-truststore.jks       | Bin 0 -> 844 bytes
 .../src/test/resources/server-keystore.jks         | Bin 0 -> 2134 bytes
 tests/pom.xml                                      |   1 +
 9 files changed, 317 insertions(+), 1 deletion(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
index 0e98490..20c3558 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
@@ -100,6 +100,10 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp
         return "#class:" + className;
     }
 
+    public static String classRef(Class<?> clazz) {
+        return "#class:" + clazz.getName();
+    }
+
     public T merge(Properties properties) {
         Set<Map.Entry<Object, Object>> set = properties.entrySet();
         connectorProps.putAll(set.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b)->b)));
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java
index 8ce678d..a89695a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java
@@ -41,6 +41,6 @@ public interface ConnectorPropertyFactory {
         Logger log = LoggerFactory.getLogger(ConnectorPropertyFactory.class);
 
         log.info("Using the following properties for the test: ");
-        properties.entrySet().forEach(entry -> log.info("{}={}", entry.getKey(), entry.getValue()));
+        properties.forEach((key, value) -> log.info("{}={}", key, value));
     }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
index 356ee0d..d8c70f5 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
@@ -17,7 +17,10 @@
 
 package org.apache.camel.kafkaconnector.common;
 
+import org.apache.camel.LoggingLevel;
+
 import static org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF;
 
 public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
 
@@ -28,4 +31,8 @@ public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorProper
     public T withSinkUrl(String sinkUrl) {
         return setProperty(CAMEL_SINK_URL_CONF, sinkUrl);
     }
+
+    public T withSinkContentLogginglevel(LoggingLevel level) {
+        return setProperty(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, level.toString());
+    }
 }
diff --git a/tests/itests-https/pom.xml b/tests/itests-https/pom.xml
new file mode 100644
index 0000000..c550e83
--- /dev/null
+++ b/tests/itests-https/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-https</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: HTTPS</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-http</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java
new file mode 100644
index 0000000..31993ca
--- /dev/null
+++ b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.https.sink;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.support.jsse.KeyStoreParameters;
+import org.apache.camel.support.jsse.SSLContextParameters;
+import org.apache.camel.support.jsse.TrustManagersParameters;
+
+final class CamelHTTPSPropertyFactory extends SinkConnectorPropertyFactory<CamelHTTPSPropertyFactory> {
+    private CamelHTTPSPropertyFactory() {
+    }
+
+    public CamelHTTPSPropertyFactory withHttpUri(String uri) {
+        return setProperty("camel.sink.path.httpUri", uri);
+    }
+
+    public CamelHTTPSPropertyFactory withSslContextParameters(String bean, String keyStore, String password) {
+        withBeans("ksp", classRef(KeyStoreParameters.class));
+        withBeans("ksp.resource", keyStore);
+        withBeans("ksp.password", password);
+
+        withBeans("tmp", classRef(TrustManagersParameters.class));
+        withBeans("tmp.keyStore", "#bean:ksp");
+
+        withBeans(bean, classRef(SSLContextParameters.class));
+        withBeans(bean + ".trustManagers", "#bean:tmp");
+
+        return setProperty("camel.sink.endpoint.sslContextParameters", "#bean:" + bean);
+    }
+
+    public CamelHTTPSPropertyFactory withX509HostnameVerifier(String bean, Class<?> verifierClass) {
+        withBeans(bean, classRef(verifierClass));
+        return setProperty("camel.sink.endpoint.x509HostnameVerifier", "#bean:" + bean);
+    }
+
+    public EndpointUrlBuilder<CamelHTTPSPropertyFactory> withUrl(String host, int port, String path) {
+        String url = String.format("https://%s:%s/%s", host, port, path);
+        return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+    }
+
+    public static CamelHTTPSPropertyFactory basic() {
+        return new CamelHTTPSPropertyFactory()
+                .withName("CamelHttpsSinkConnector")
+                .withConnectorClass("org.apache.camel.kafkaconnector.https.CamelHttpsSinkConnector")
+                .withTasksMax(1)
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withSinkContentLogginglevel(LoggingLevel.DEBUG);
+    }
+}
diff --git a/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java
new file mode 100644
index 0000000..e5d6672
--- /dev/null
+++ b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.https.sink;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkHTTPSITCase extends CamelSinkTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkHTTPSITCase.class);
+
+    private final String host = NetworkUtils.getHostname();
+    private final int port = NetworkUtils.getFreePort(host);
+
+    private MockWebServer mockServer;
+
+    private String topicName;
+
+    private final int expect = 10;
+    private List<RecordedRequest> received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-https-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        topicName = getTopicForTest(this);
+
+        setupHttpsMockServer();
+        received = Collections.emptyList();
+    }
+
+    private void setupHttpsMockServer() throws Exception {
+        KeyStore keyStore = KeyStore.getInstance("JKS");
+        keyStore.load(getClass().getResourceAsStream("/server-keystore.jks"), "secret".toCharArray());
+        KeyManagerFactory kmFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        kmFactory.init(keyStore, "secret".toCharArray());
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(kmFactory.getKeyManagers(), null, null);
+        mockServer = new MockWebServer();
+        mockServer.useHttps(sslContext.getSocketFactory(), false);
+    }
+
+    private void startMockServer() throws IOException {
+        IntStream.range(0, expect).forEach(i -> {
+            mockServer.enqueue(new MockResponse().setResponseCode(200));
+        });
+        mockServer.start(InetAddress.getByName(host), port);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (mockServer != null) {
+            mockServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            received = IntStream.range(0, expect).mapToObj(i -> {
+                try {
+                    return mockServer.takeRequest(10, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+                    return null;
+                }
+            }).collect(Collectors.toList());
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        String expected = "Sink test message ";
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(expect, received.size(), "Did not receive the same amount of messages that were sent");
+
+
+            for (RecordedRequest request : received) {
+                String actual = request.getBody().readUtf8();
+                LOG.debug("Received: {} ", actual);
+
+                assertEquals("/ckc", request.getPath(), "Received path differed");
+                assertTrue(actual.startsWith(expected), "Received message content differed");
+            }
+
+            assertEquals(expect, received.size(), "Did not receive the same amount of messages that were sent");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testBasicSendReceive() throws Exception {
+        startMockServer();
+
+        String uri = mockServer.getHostName() + ":" + mockServer.getPort() + "/ckc";
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPSPropertyFactory.basic()
+                .withTopics(topicName)
+                .withHttpUri(uri)
+                .withSslContextParameters("scp", toPath("client-truststore.jks"), "secret")
+                // let's skip host verification as hostname may vary depending on test env
+                .withX509HostnameVerifier("x509HostnameVerifier", NoopHostnameVerifier.class);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        startMockServer();
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPSPropertyFactory.basic()
+                .withTopics(topicName)
+                .withSslContextParameters("scp", toPath("client-truststore.jks"), "secret")
+                // let's skip host verification as hostname may vary depending on test env
+                .withX509HostnameVerifier("x509HostnameVerifier", NoopHostnameVerifier.class)
+                .withUrl(mockServer.getHostName(), mockServer.getPort(), "ckc")
+                .append("sslContextParameters", "#bean:scp")
+                .append("x509HostnameVerifier", "#bean:x509HostnameVerifier")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    private String toPath(String resource) {
+        URL url = Objects.requireNonNull(getClass().getClassLoader().getResource(resource));
+        return url.getPath();
+    }
+}
diff --git a/tests/itests-https/src/test/resources/client-truststore.jks b/tests/itests-https/src/test/resources/client-truststore.jks
new file mode 100644
index 0000000..d4c5d58
Binary files /dev/null and b/tests/itests-https/src/test/resources/client-truststore.jks differ
diff --git a/tests/itests-https/src/test/resources/server-keystore.jks b/tests/itests-https/src/test/resources/server-keystore.jks
new file mode 100644
index 0000000..a230624
Binary files /dev/null and b/tests/itests-https/src/test/resources/server-keystore.jks differ
diff --git a/tests/pom.xml b/tests/pom.xml
index 0c3ff80..cff0878 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -49,6 +49,7 @@
         <module>itests-syslog</module>
         <module>itests-file</module>
         <module>itests-http</module>
+        <module>itests-https</module>
         <module>itests-timer</module>
         <module>itests-slack</module>
         <module>itests-salesforce</module>