You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/26 08:19:52 UTC

[camel-kafka-connector] branch master updated (fc1dd52 -> c06f24b)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from fc1dd52  Updated CHANGELOG.md
     new a2d437b  Added Integration tests for Camel-Ssh-Kafka-Connector
     new c06f24b  Added Integration test for Camel-Ssh-Kafka-connector to modules

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/{itests-aws-common => itests-ssh}/pom.xml    | 18 ++++-
 .../kafkaconnector/ssh/services/SshContainer.java} | 22 +++---
 .../ssh/services/SshLocalContainerService.java}    | 29 ++++---
 .../ssh/services/SshRemoteService.java}            | 22 +++---
 .../kafkaconnector/ssh/services/SshService.java}   | 15 ++--
 .../ssh/services/SshServiceFactory.java}           | 20 ++---
 .../ssh/sink/CamelSinkSshITCase.java}              | 90 +++-------------------
 .../ssh/sink/CamelSshPropertyFactory.java          | 49 ++++++++++++
 .../ssh/source/CamelSourceSshITCase.java}          | 66 ++++++----------
 .../ssh/source/CamelSshPropertyFactory.java        | 56 ++++++++++++++
 .../kafkaconnector/ssh/source/SshTransforms.java   | 35 +++++----
 tests/pom.xml                                      |  1 +
 12 files changed, 229 insertions(+), 194 deletions(-)
 copy tests/{itests-aws-common => itests-ssh}/pom.xml (85%)
 copy tests/{itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/services/CassandraContainer.java => itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java} (67%)
 copy tests/{itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/services/CassandraLocalContainerService.java => itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java} (58%)
 copy tests/{itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/RemoteHDFSService.java => itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java} (74%)
 copy tests/{itests-aws-common/src/test/java/org/apache/camel/kafkaconnector/aws/common/services/AWSService.java => itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java} (83%)
 copy tests/{itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java => itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java} (62%)
 copy tests/{itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java => itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java} (51%)
 create mode 100644 tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java
 copy tests/{itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java => itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java} (55%)
 create mode 100644 tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java
 copy connectors/camel-ftps-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/ftps/transformers/FtpsRemoteFileTransforms.java => tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java (56%)


[camel-kafka-connector] 01/02: Added Integration tests for Camel-Ssh-Kafka-Connector

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit a2d437b78c4dc1f8cc1b0e06936d5526d05ccec8
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 26 08:09:32 2020 +0100

    Added Integration tests for Camel-Ssh-Kafka-Connector
---
 tests/itests-ssh/pom.xml                           | 70 ++++++++++++++++
 .../kafkaconnector/ssh/services/SshContainer.java  | 45 ++++++++++
 .../ssh/services/SshLocalContainerService.java     | 54 ++++++++++++
 .../ssh/services/SshRemoteService.java             | 49 +++++++++++
 .../kafkaconnector/ssh/services/SshService.java    | 53 ++++++++++++
 .../ssh/services/SshServiceFactory.java            | 45 ++++++++++
 .../ssh/sink/CamelSinkSshITCase.java               | 97 ++++++++++++++++++++++
 .../ssh/sink/CamelSshPropertyFactory.java          | 49 +++++++++++
 .../ssh/source/CamelSourceSshITCase.java           | 83 ++++++++++++++++++
 .../ssh/source/CamelSshPropertyFactory.java        | 56 +++++++++++++
 .../kafkaconnector/ssh/source/SshTransforms.java   | 76 +++++++++++++++++
 11 files changed, 677 insertions(+)

diff --git a/tests/itests-ssh/pom.xml b/tests/itests-ssh/pom.xml
new file mode 100644
index 0000000..36302ed
--- /dev/null
+++ b/tests/itests-ssh/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-ssh</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: SSH</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-ssh</artifactId>
+        </dependency>
+
+<dependency>
+  <groupId>commons-io</groupId>
+  <artifactId>commons-io</artifactId>
+  <version>2.8.0</version>
+</dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java
new file mode 100644
index 0000000..12a452d
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+/**
+ * A local instance of an Ubuntu Server with SSH enabled
+ */
+public class SshContainer extends GenericContainer<SshContainer> {
+    private static final String SSH_IMAGE = "rastasheep/ubuntu-sshd:14.04";
+    private static final int SSH_PORT = 22;
+
+    public SshContainer() {
+        super(SSH_IMAGE);
+
+        withExposedPorts(SSH_PORT);
+
+        waitingFor(Wait.forListeningPort());
+    }
+
+    public int getSSHPort() {
+        return getMappedPort(SSH_PORT);
+    }
+
+    public String getSSHHost() {
+        return getContainerIpAddress();
+    }
+
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java
new file mode 100644
index 0000000..9276890
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SshLocalContainerService implements SshService {
+    private static final Logger LOG = LoggerFactory.getLogger(SshLocalContainerService.class);
+
+    private SshContainer container;
+
+    public SshLocalContainerService() {
+        container = new SshContainer();
+
+        container.start();
+    }
+
+    @Override
+    public int getSshPort() {
+        return container.getSSHPort();
+    }
+
+    @Override
+    public String getSshHost() {
+        return container.getSSHHost();
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("SSH server running at address {}", getSshEndpoint());
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Stopping the Ssh container");
+        container.stop();
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java
new file mode 100644
index 0000000..cb5de9c
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+public class SshRemoteService implements SshService {
+
+    private static final int DEFAULT_SSH_PORT = 22;
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public int getSshPort() {
+        String strPort = System.getProperty("ssh.port");
+
+        if (strPort != null) {
+            return Integer.parseInt(strPort);
+        }
+
+        return DEFAULT_SSH_PORT;
+    }
+
+    @Override
+    public String getSshHost() {
+        return System.getProperty("ssh.host");
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java
new file mode 100644
index 0000000..d5a375e
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface SshService extends BeforeAllCallback, AfterAllCallback {
+
+    int getSshPort();
+
+    default String getSshEndpoint() {
+        return getSshHost() + ":" + getSshPort();
+    }
+
+    String getSshHost();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java
new file mode 100644
index 0000000..8e933ef
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SshServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(SshServiceFactory.class);
+
+    private SshServiceFactory() {
+
+    }
+
+    public static SshService createService() {
+        String instanceType = System.getProperty("ssh.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-ssh-container")) {
+            return new SshLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new SshRemoteService();
+        }
+
+        LOG.error("ssh instance must be one of 'local-ssh-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType));
+
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
new file mode 100644
index 0000000..96fceb1
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.ssh.services.SshService;
+import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class CamelSinkSshITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static SshService sshService = SshServiceFactory.createService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class);
+
+    private final int expect = 3;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-ssh-kafka-connector"};
+    }
+
+    private void putRecords(CountDownLatch latch) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        try {
+            for (int i = 0; i < expect; i++) {
+                try {
+                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date");
+                } catch (ExecutionException e) {
+                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService service = Executors.newCachedThreadPool();
+        service.submit(() -> putRecords(latch));
+
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            fail("Timed out wait for data to be added to the Kafka cluster");
+        }
+    }
+
+    @Timeout(90)
+    @Test
+    public void testSshCommand() throws ExecutionException, InterruptedException {
+        String topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost())
+            .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root");
+
+        runTest(connectorPropertyFactory);
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java
new file mode 100644
index 0000000..9ca3dcb
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.sink;
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelSshPropertyFactory extends SinkConnectorPropertyFactory<CamelSshPropertyFactory> {
+
+    private CamelSshPropertyFactory() {
+
+    }
+
+    public CamelSshPropertyFactory withHost(String host) {
+        return setProperty("camel.sink.path.host", host);
+    }
+
+    public CamelSshPropertyFactory withPort(String port) {
+        return setProperty("camel.sink.path.port", port);
+    }
+
+    public CamelSshPropertyFactory withUsername(String username) {
+        return setProperty("camel.sink.endpoint.username", username);
+    }
+
+    public CamelSshPropertyFactory withPassword(String password) {
+        return setProperty("camel.sink.endpoint.password", password);
+    }
+
+    public static CamelSshPropertyFactory basic() {
+        return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1)
+            .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+            .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
new file mode 100644
index 0000000..8cd9abf
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.ssh.services.SshService;
+import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CamelSourceSshITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static SshService sshService = SshServiceFactory.createService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class);
+
+    private final int expect = 1;
+    private int received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-ssh-kafka-connector"};
+    }
+
+    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+
+        LOG.debug("Received: {}", record.value());
+        received++;
+
+        return false;
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
+    }
+
+    @Timeout(90)
+    @Test
+    public void testRetrieveFromSsh() throws ExecutionException, InterruptedException {
+        String topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withKafkaTopic(topic).withHost(sshService.getSshHost())
+            .withPort(Integer.toString(sshService.getSshPort())).withDelay(Integer.toString(10000)).withUsername("root").withPassword("root").withPollcommand("date")
+            .withTransformsConfig("SshTransforms").withEntry("type", "org.apache.camel.kafkaconnector.ssh.source.SshTransforms").end();
+
+        runTest(connectorPropertyFactory);
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java
new file mode 100644
index 0000000..bccd5ff
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelSshPropertyFactory extends SourceConnectorPropertyFactory<CamelSshPropertyFactory> {
+
+    private CamelSshPropertyFactory() {
+
+    }
+
+    public CamelSshPropertyFactory withHost(String host) {
+        return setProperty("camel.source.path.host", host);
+    }
+
+    public CamelSshPropertyFactory withPort(String port) {
+        return setProperty("camel.source.path.port", port);
+    }
+
+    public CamelSshPropertyFactory withDelay(String value) {
+        return setProperty("camel.source.endpoint.delay", value);
+    }
+
+    public CamelSshPropertyFactory withUsername(String username) {
+        return setProperty("camel.source.endpoint.username", username);
+    }
+
+    public CamelSshPropertyFactory withPassword(String password) {
+        return setProperty("camel.source.endpoint.password", password);
+    }
+
+    public CamelSshPropertyFactory withPollcommand(String pollCommand) {
+        return setProperty("camel.source.endpoint.pollCommand", pollCommand);
+    }
+
+    public static CamelSshPropertyFactory basic() {
+        return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1)
+            .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java
new file mode 100644
index 0000000..6097d75
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SshTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef().define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                                                                      "Transforms String-based content from Kafka into a map");
+
+    private static final Logger LOG = LoggerFactory.getLogger(SshTransforms.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (r.value() instanceof ByteArrayInputStream) {
+            LOG.debug("Converting record from Ssh Body Result to text");
+            ByteArrayInputStream message = (ByteArrayInputStream)r.value();
+            String m = null;
+            try {
+                m = IOUtils.toString(message, Charset.defaultCharset());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), SchemaHelper.buildSchemaBuilderForType(m), m, r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}


[camel-kafka-connector] 02/02: Added Integration test for Camel-Ssh-Kafka-connector to modules

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c06f24bd14992319def6cedbd6e7625ef0ed8733
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 26 08:10:14 2020 +0100

    Added Integration test for Camel-Ssh-Kafka-connector to modules
---
 tests/pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tests/pom.xml b/tests/pom.xml
index ea303fa..f32950b 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -62,6 +62,7 @@
         <module>perf-tests-rabbitmq</module>
         <module>itests-rabbitmq</module>
         <module>itests-couchbase</module>
+        <module>itests-ssh</module>
     </modules>