You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:50 UTC

[camel-kafka-connector] 17/18: Convert the SSH tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 36f3f9be64e3c4bcece6291742449342ecd69ba5
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:51:28 2021 +0100

    Convert the SSH tests to the new reusable sink test base class
---
 .../ssh/sink/CamelSinkSshITCase.java               | 67 +++++++++++-----------
 1 file changed, 32 insertions(+), 35 deletions(-)

diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index cf7e9dd..1c71719 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -17,18 +17,16 @@
 
 package org.apache.camel.kafkaconnector.ssh.sink;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.ssh.services.SshService;
 import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
@@ -40,46 +38,42 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
-public class CamelSinkSshITCase extends AbstractKafkaTest {
+public class CamelSinkSshITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static SshService sshService = SshServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class);
 
     private final int expect = 3;
+    private String topic;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-ssh-kafka-connector"};
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        try {
-            for (int i = 0; i < expect; i++) {
-                try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date");
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
-            }
-        } finally {
-            latch.countDown();
-        }
+    @BeforeEach
+    public void setUp() {
+        topic = TestUtils.getDefaultTestTopic(this.getClass());
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
+    @Override
+    protected String testMessageContent(int current) {
+        return "date";
+    }
 
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        latch.countDown();
+    }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (!latch.await(30, TimeUnit.SECONDS)) {
             fail("Timed out wait for data to be added to the Kafka cluster");
         }
@@ -87,12 +81,15 @@ public class CamelSinkSshITCase extends AbstractKafkaTest {
 
     @Timeout(90)
     @Test
-    public void testSshCommand() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
-        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost())
-            .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root");
-
-        runTest(connectorPropertyFactory);
+    public void testSshCommand() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory
+                .basic()
+                .withTopics(topic)
+                .withHost(sshService.getSshHost())
+                .withPort(Integer.toString(sshService.getSshPort()))
+                .withUsername("root")
+                .withPassword("root");
+
+        runTest(connectorPropertyFactory, topic, expect);
     }
 }