You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/26 07:11:36 UTC

[camel-kafka-connector] 01/02: Added Integration tests for Camel-Ssh-Kafka-Connector

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch ssh-itest
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f98f3d9ff81ba5d6fa71d180e3736830cc429ed3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 26 08:09:32 2020 +0100

    Added Integration tests for Camel-Ssh-Kafka-Connector
---
 tests/itests-ssh/pom.xml                           | 70 ++++++++++++++++
 .../kafkaconnector/ssh/services/SshContainer.java  | 45 ++++++++++
 .../ssh/services/SshLocalContainerService.java     | 54 ++++++++++++
 .../ssh/services/SshRemoteService.java             | 49 +++++++++++
 .../kafkaconnector/ssh/services/SshService.java    | 53 ++++++++++++
 .../ssh/services/SshServiceFactory.java            | 45 ++++++++++
 .../ssh/sink/CamelSinkSshITCase.java               | 97 ++++++++++++++++++++++
 .../ssh/sink/CamelSshPropertyFactory.java          | 49 +++++++++++
 .../ssh/source/CamelSourceSshITCase.java           | 83 ++++++++++++++++++
 .../ssh/source/CamelSshPropertyFactory.java        | 56 +++++++++++++
 .../kafkaconnector/ssh/source/SshTransforms.java   | 76 +++++++++++++++++
 11 files changed, 677 insertions(+)

diff --git a/tests/itests-ssh/pom.xml b/tests/itests-ssh/pom.xml
new file mode 100644
index 0000000..36302ed
--- /dev/null
+++ b/tests/itests-ssh/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-ssh</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: SSH</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-ssh</artifactId>
+        </dependency>
+
+<dependency>
+  <groupId>commons-io</groupId>
+  <artifactId>commons-io</artifactId>
+  <version>2.8.0</version>
+</dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java
new file mode 100644
index 0000000..12a452d
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+/**
+ * A local instance of an Ubuntu Server with SSH enabled
+ */
+public class SshContainer extends GenericContainer<SshContainer> {
+    private static final String SSH_IMAGE = "rastasheep/ubuntu-sshd:14.04";
+    private static final int SSH_PORT = 22;
+
+    public SshContainer() {
+        super(SSH_IMAGE);
+
+        withExposedPorts(SSH_PORT);
+
+        waitingFor(Wait.forListeningPort());
+    }
+
+    public int getSSHPort() {
+        return getMappedPort(SSH_PORT);
+    }
+
+    public String getSSHHost() {
+        return getContainerIpAddress();
+    }
+
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java
new file mode 100644
index 0000000..9276890
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SshLocalContainerService implements SshService {
+    private static final Logger LOG = LoggerFactory.getLogger(SshLocalContainerService.class);
+
+    private SshContainer container;
+
+    public SshLocalContainerService() {
+        container = new SshContainer();
+
+        container.start();
+    }
+
+    @Override
+    public int getSshPort() {
+        return container.getSSHPort();
+    }
+
+    @Override
+    public String getSshHost() {
+        return container.getSSHHost();
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("SSH server running at address {}", getSshEndpoint());
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Stopping the Ssh container");
+        container.stop();
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java
new file mode 100644
index 0000000..cb5de9c
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+public class SshRemoteService implements SshService {
+
+    private static final int DEFAULT_SSH_PORT = 22;
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public int getSshPort() {
+        String strPort = System.getProperty("ssh.port");
+
+        if (strPort != null) {
+            return Integer.parseInt(strPort);
+        }
+
+        return DEFAULT_SSH_PORT;
+    }
+
+    @Override
+    public String getSshHost() {
+        return System.getProperty("ssh.host");
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java
new file mode 100644
index 0000000..d5a375e
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface SshService extends BeforeAllCallback, AfterAllCallback {
+
+    int getSshPort();
+
+    default String getSshEndpoint() {
+        return getSshHost() + ":" + getSshPort();
+    }
+
+    String getSshHost();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java
new file mode 100644
index 0000000..8e933ef
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SshServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(SshServiceFactory.class);
+
+    private SshServiceFactory() {
+
+    }
+
+    public static SshService createService() {
+        String instanceType = System.getProperty("ssh.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-ssh-container")) {
+            return new SshLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new SshRemoteService();
+        }
+
+        LOG.error("ssh instance must be one of 'local-ssh-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType));
+
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
new file mode 100644
index 0000000..96fceb1
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.ssh.services.SshService;
+import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class CamelSinkSshITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static SshService sshService = SshServiceFactory.createService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class);
+
+    private final int expect = 3;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-ssh-kafka-connector"};
+    }
+
+    private void putRecords(CountDownLatch latch) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        try {
+            for (int i = 0; i < expect; i++) {
+                try {
+                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date");
+                } catch (ExecutionException e) {
+                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService service = Executors.newCachedThreadPool();
+        service.submit(() -> putRecords(latch));
+
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            fail("Timed out wait for data to be added to the Kafka cluster");
+        }
+    }
+
+    @Timeout(90)
+    @Test
+    public void testSshCommand() throws ExecutionException, InterruptedException {
+        String topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost())
+            .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root");
+
+        runTest(connectorPropertyFactory);
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java
new file mode 100644
index 0000000..9ca3dcb
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.sink;
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelSshPropertyFactory extends SinkConnectorPropertyFactory<CamelSshPropertyFactory> {
+
+    private CamelSshPropertyFactory() {
+
+    }
+
+    public CamelSshPropertyFactory withHost(String host) {
+        return setProperty("camel.sink.path.host", host);
+    }
+
+    public CamelSshPropertyFactory withPort(String port) {
+        return setProperty("camel.sink.path.port", port);
+    }
+
+    public CamelSshPropertyFactory withUsername(String username) {
+        return setProperty("camel.sink.endpoint.username", username);
+    }
+
+    public CamelSshPropertyFactory withPassword(String password) {
+        return setProperty("camel.sink.endpoint.password", password);
+    }
+
+    public static CamelSshPropertyFactory basic() {
+        return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1)
+            .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+            .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
new file mode 100644
index 0000000..8cd9abf
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.ssh.services.SshService;
+import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CamelSourceSshITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static SshService sshService = SshServiceFactory.createService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class);
+
+    private final int expect = 1;
+    private int received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-ssh-kafka-connector"};
+    }
+
+    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+
+        LOG.debug("Received: {}", record.value());
+        received++;
+
+        return false;
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
+    }
+
+    @Timeout(90)
+    @Test
+    public void testRetrieveFromSsh() throws ExecutionException, InterruptedException {
+        String topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withKafkaTopic(topic).withHost(sshService.getSshHost())
+            .withPort(Integer.toString(sshService.getSshPort())).withDelay(Integer.toString(10000)).withUsername("root").withPassword("root").withPollcommand("date")
+            .withTransformsConfig("SshTransforms").withEntry("type", "org.apache.camel.kafkaconnector.ssh.source.SshTransforms").end();
+
+        runTest(connectorPropertyFactory);
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java
new file mode 100644
index 0000000..bccd5ff
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelSshPropertyFactory extends SourceConnectorPropertyFactory<CamelSshPropertyFactory> {
+
+    private CamelSshPropertyFactory() {
+
+    }
+
+    public CamelSshPropertyFactory withHost(String host) {
+        return setProperty("camel.source.path.host", host);
+    }
+
+    public CamelSshPropertyFactory withPort(String port) {
+        return setProperty("camel.source.path.port", port);
+    }
+
+    public CamelSshPropertyFactory withDelay(String value) {
+        return setProperty("camel.source.endpoint.delay", value);
+    }
+
+    public CamelSshPropertyFactory withUsername(String username) {
+        return setProperty("camel.source.endpoint.username", username);
+    }
+
+    public CamelSshPropertyFactory withPassword(String password) {
+        return setProperty("camel.source.endpoint.password", password);
+    }
+
+    public CamelSshPropertyFactory withPollcommand(String pollCommand) {
+        return setProperty("camel.source.endpoint.pollCommand", pollCommand);
+    }
+
+    public static CamelSshPropertyFactory basic() {
+        return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1)
+            .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java
new file mode 100644
index 0000000..6097d75
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SshTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef().define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                                                                      "Transforms String-based content from Kafka into a map");
+
+    private static final Logger LOG = LoggerFactory.getLogger(SshTransforms.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (r.value() instanceof ByteArrayInputStream) {
+            LOG.debug("Converting record from Ssh Body Result to text");
+            ByteArrayInputStream message = (ByteArrayInputStream)r.value();
+            String m = null;
+            try {
+                m = IOUtils.toString(message, Charset.defaultCharset());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), SchemaHelper.buildSchemaBuilderForType(m), m, r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}