You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/26 07:11:36 UTC
[camel-kafka-connector] 01/02: Added Integration tests for
Camel-Ssh-Kafka-Connector
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch ssh-itest
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f98f3d9ff81ba5d6fa71d180e3736830cc429ed3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 26 08:09:32 2020 +0100
Added Integration tests for Camel-Ssh-Kafka-Connector
---
tests/itests-ssh/pom.xml | 70 ++++++++++++++++
.../kafkaconnector/ssh/services/SshContainer.java | 45 ++++++++++
.../ssh/services/SshLocalContainerService.java | 54 ++++++++++++
.../ssh/services/SshRemoteService.java | 49 +++++++++++
.../kafkaconnector/ssh/services/SshService.java | 53 ++++++++++++
.../ssh/services/SshServiceFactory.java | 45 ++++++++++
.../ssh/sink/CamelSinkSshITCase.java | 97 ++++++++++++++++++++++
.../ssh/sink/CamelSshPropertyFactory.java | 49 +++++++++++
.../ssh/source/CamelSourceSshITCase.java | 83 ++++++++++++++++++
.../ssh/source/CamelSshPropertyFactory.java | 56 +++++++++++++
.../kafkaconnector/ssh/source/SshTransforms.java | 76 +++++++++++++++++
11 files changed, 677 insertions(+)
diff --git a/tests/itests-ssh/pom.xml b/tests/itests-ssh/pom.xml
new file mode 100644
index 0000000..36302ed
--- /dev/null
+++ b/tests/itests-ssh/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-ssh</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: SSH</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-ssh</artifactId>
+ </dependency>
+
+<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.8.0</version>
+</dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java
new file mode 100644
index 0000000..12a452d
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+/**
+ * A local instance of an Ubuntu Server with SSH enabled
+ */
+public class SshContainer extends GenericContainer<SshContainer> {
+ private static final String SSH_IMAGE = "rastasheep/ubuntu-sshd:14.04";
+ private static final int SSH_PORT = 22;
+
+ public SshContainer() {
+ super(SSH_IMAGE);
+
+ withExposedPorts(SSH_PORT);
+
+ waitingFor(Wait.forListeningPort());
+ }
+
+ public int getSSHPort() {
+ return getMappedPort(SSH_PORT);
+ }
+
+ public String getSSHHost() {
+ return getContainerIpAddress();
+ }
+
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java
new file mode 100644
index 0000000..9276890
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SshLocalContainerService implements SshService {
+ private static final Logger LOG = LoggerFactory.getLogger(SshLocalContainerService.class);
+
+ private SshContainer container;
+
+ public SshLocalContainerService() {
+ container = new SshContainer();
+
+ container.start();
+ }
+
+ @Override
+ public int getSshPort() {
+ return container.getSSHPort();
+ }
+
+ @Override
+ public String getSshHost() {
+ return container.getSSHHost();
+ }
+
+ @Override
+ public void initialize() {
+ LOG.info("SSH server running at address {}", getSshEndpoint());
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Stopping the Ssh container");
+ container.stop();
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java
new file mode 100644
index 0000000..cb5de9c
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+public class SshRemoteService implements SshService {
+
+ private static final int DEFAULT_SSH_PORT = 22;
+
+ @Override
+ public void initialize() {
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public int getSshPort() {
+ String strPort = System.getProperty("ssh.port");
+
+ if (strPort != null) {
+ return Integer.parseInt(strPort);
+ }
+
+ return DEFAULT_SSH_PORT;
+ }
+
+ @Override
+ public String getSshHost() {
+ return System.getProperty("ssh.host");
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java
new file mode 100644
index 0000000..d5a375e
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface SshService extends BeforeAllCallback, AfterAllCallback {
+
+ int getSshPort();
+
+ default String getSshEndpoint() {
+ return getSshHost() + ":" + getSshPort();
+ }
+
+ String getSshHost();
+
+ /**
+ * Perform any initialization necessary
+ */
+ void initialize();
+
+ /**
+ * Shuts down the service after the test has completed
+ */
+ void shutdown();
+
+ @Override
+ default void beforeAll(ExtensionContext extensionContext) throws Exception {
+ initialize();
+ }
+
+ @Override
+ default void afterAll(ExtensionContext extensionContext) throws Exception {
+ shutdown();
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java
new file mode 100644
index 0000000..8e933ef
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SshServiceFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(SshServiceFactory.class);
+
+ private SshServiceFactory() {
+
+ }
+
+ public static SshService createService() {
+ String instanceType = System.getProperty("ssh.instance.type");
+
+ if (instanceType == null || instanceType.equals("local-ssh-container")) {
+ return new SshLocalContainerService();
+ }
+
+ if (instanceType.equals("remote")) {
+ return new SshRemoteService();
+ }
+
+ LOG.error("ssh instance must be one of 'local-ssh-container' or 'remote");
+ throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType));
+
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
new file mode 100644
index 0000000..96fceb1
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.ssh.services.SshService;
+import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class CamelSinkSshITCase extends AbstractKafkaTest {
+ @RegisterExtension
+ public static SshService sshService = SshServiceFactory.createService();
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class);
+
+ private final int expect = 3;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-ssh-kafka-connector"};
+ }
+
+ private void putRecords(CountDownLatch latch) {
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ try {
+ for (int i = 0; i < expect; i++) {
+ try {
+ kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date");
+ } catch (ExecutionException e) {
+ LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+ connectorPropertyFactory.log();
+
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ ExecutorService service = Executors.newCachedThreadPool();
+ service.submit(() -> putRecords(latch));
+
+ if (!latch.await(30, TimeUnit.SECONDS)) {
+ fail("Timed out wait for data to be added to the Kafka cluster");
+ }
+ }
+
+ @Timeout(90)
+ @Test
+ public void testSshCommand() throws ExecutionException, InterruptedException {
+ String topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost())
+ .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root");
+
+ runTest(connectorPropertyFactory);
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java
new file mode 100644
index 0000000..9ca3dcb
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.sink;
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelSshPropertyFactory extends SinkConnectorPropertyFactory<CamelSshPropertyFactory> {
+
+ private CamelSshPropertyFactory() {
+
+ }
+
+ public CamelSshPropertyFactory withHost(String host) {
+ return setProperty("camel.sink.path.host", host);
+ }
+
+ public CamelSshPropertyFactory withPort(String port) {
+ return setProperty("camel.sink.path.port", port);
+ }
+
+ public CamelSshPropertyFactory withUsername(String username) {
+ return setProperty("camel.sink.endpoint.username", username);
+ }
+
+ public CamelSshPropertyFactory withPassword(String password) {
+ return setProperty("camel.sink.endpoint.password", password);
+ }
+
+ public static CamelSshPropertyFactory basic() {
+ return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
new file mode 100644
index 0000000..8cd9abf
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.ssh.services.SshService;
+import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CamelSourceSshITCase extends AbstractKafkaTest {
+ @RegisterExtension
+ public static SshService sshService = SshServiceFactory.createService();
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class);
+
+ private final int expect = 1;
+ private int received;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-ssh-kafka-connector"};
+ }
+
+ private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+
+ LOG.debug("Received: {}", record.value());
+ received++;
+
+ return false;
+ }
+
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+ connectorPropertyFactory.log();
+
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+ LOG.debug("Creating the consumer ...");
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+ LOG.debug("Created the consumer ...");
+
+ assertEquals(received, expect, "Didn't process the expected amount of messages");
+ }
+
+ @Timeout(90)
+ @Test
+ public void testRetrieveFromSsh() throws ExecutionException, InterruptedException {
+ String topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withKafkaTopic(topic).withHost(sshService.getSshHost())
+ .withPort(Integer.toString(sshService.getSshPort())).withDelay(Integer.toString(10000)).withUsername("root").withPassword("root").withPollcommand("date")
+ .withTransformsConfig("SshTransforms").withEntry("type", "org.apache.camel.kafkaconnector.ssh.source.SshTransforms").end();
+
+ runTest(connectorPropertyFactory);
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java
new file mode 100644
index 0000000..bccd5ff
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelSshPropertyFactory extends SourceConnectorPropertyFactory<CamelSshPropertyFactory> {
+
+ private CamelSshPropertyFactory() {
+
+ }
+
+ public CamelSshPropertyFactory withHost(String host) {
+ return setProperty("camel.source.path.host", host);
+ }
+
+ public CamelSshPropertyFactory withPort(String port) {
+ return setProperty("camel.source.path.port", port);
+ }
+
+ public CamelSshPropertyFactory withDelay(String value) {
+ return setProperty("camel.source.endpoint.delay", value);
+ }
+
+ public CamelSshPropertyFactory withUsername(String username) {
+ return setProperty("camel.source.endpoint.username", username);
+ }
+
+ public CamelSshPropertyFactory withPassword(String password) {
+ return setProperty("camel.source.endpoint.password", password);
+ }
+
+ public CamelSshPropertyFactory withPollcommand(String pollCommand) {
+ return setProperty("camel.source.endpoint.pollCommand", pollCommand);
+ }
+
+ public static CamelSshPropertyFactory basic() {
+ return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java
new file mode 100644
index 0000000..6097d75
--- /dev/null
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.ssh.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SshTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
+ public static final String FIELD_KEY_CONFIG = "key";
+ public static final ConfigDef CONFIG_DEF = new ConfigDef().define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+ "Transforms String-based content from Kafka into a map");
+
+ private static final Logger LOG = LoggerFactory.getLogger(SshTransforms.class);
+
+ @Override
+ public R apply(R r) {
+ Object value = r.value();
+
+ if (r.value() instanceof ByteArrayInputStream) {
+ LOG.debug("Converting record from Ssh Body Result to text");
+ ByteArrayInputStream message = (ByteArrayInputStream)r.value();
+ String m = null;
+ try {
+ m = IOUtils.toString(message, Charset.defaultCharset());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), SchemaHelper.buildSchemaBuilderForType(m), m, r.timestamp());
+
+ } else {
+ LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+ return r;
+ }
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+
+ }
+}