You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:50 UTC

[camel-kafka-connector] 10/14: Converted the SSH source test case to use the reusable source base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 550d1a2a64771200568ae40be24e5af4aa5448f8
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 09:14:35 2021 +0100

    Converted the SSH source test case to use the reusable source base class
---
 .../ssh/sink/CamelSinkSshITCase.java               |  3 +-
 .../ssh/source/CamelSourceSshITCase.java           | 47 +++++++---------------
 2 files changed, 17 insertions(+), 33 deletions(-)

diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index d0535d4..02f6f21 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.ssh.services.SshService;
 import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkSshITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static SshService sshService = SshServiceFactory.createService();
@@ -69,7 +71,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
     }
 
 
-
     @Override
     protected void consumeMessages(CountDownLatch latch) {
         latch.countDown();
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
index 6673c01..488029d 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
@@ -19,36 +19,29 @@ package org.apache.camel.kafkaconnector.ssh.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.ssh.services.SshService;
 import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceSshITCase extends AbstractKafkaTest {
+public class CamelSourceSshITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static SshService sshService = SshServiceFactory.createService();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class);
-
     private final int expect = 1;
-    private int received;
     private String oldUserHome = System.getProperty("user.home");
 
     @Override
@@ -56,41 +49,32 @@ public class CamelSourceSshITCase extends AbstractKafkaTest {
         return new String[] {"camel-ssh-kafka-connector"};
     }
 
-    @BeforeEach
+    @BeforeAll
     public void setupKeyHome() {
         System.setProperty("user.home", "target/user-home");
     }
 
-    @AfterEach
+    @AfterAll
     public void tearDownKeyHome() {
         System.setProperty("user.home", oldUserHome);
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-
-        LOG.debug("Received: {}", record.value());
-        received++;
+    @Override
+    protected void produceTestData() {
 
-        return false;
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Timeout(90)
     @Test
     public void testRetrieveFromSsh() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
+        String topic = getTopicForTest(this);
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory
                 .basic()
@@ -105,7 +89,6 @@ public class CamelSourceSshITCase extends AbstractKafkaTest {
                     .withEntry("type", "org.apache.camel.kafkaconnector.ssh.transformers.SshTransforms")
                 .end();
 
-
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topic, expect);
     }
 }