You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/09 14:39:07 UTC
[camel-kafka-connector] 09/11: Converted the SSH source test case
to use the reusable source base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit dafface9f37c8842ed9b69d6a11aa0e8ff440fc1
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 09:14:35 2021 +0100
Converted the SSH source test case to use the reusable source base class
---
.../ssh/sink/CamelSinkSshITCase.java | 3 +-
.../ssh/source/CamelSourceSshITCase.java | 47 +++++++---------------
2 files changed, 17 insertions(+), 33 deletions(-)
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index d0535d4..02f6f21 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.ssh.services.SshService;
import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
disabledReason = "Hangs when running with the embedded Kafka Connect instance")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkSshITCase extends CamelSinkTestSupport {
@RegisterExtension
public static SshService sshService = SshServiceFactory.createService();
@@ -69,7 +71,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
}
-
@Override
protected void consumeMessages(CountDownLatch latch) {
latch.countDown();
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
index 6673c01..488029d 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
@@ -19,36 +19,29 @@ package org.apache.camel.kafkaconnector.ssh.source;
import java.util.concurrent.ExecutionException;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.ssh.services.SshService;
import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
@DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
disabledReason = "Hangs when running with the embedded Kafka Connect instance")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceSshITCase extends AbstractKafkaTest {
+public class CamelSourceSshITCase extends CamelSourceTestSupport {
@RegisterExtension
public static SshService sshService = SshServiceFactory.createService();
- private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class);
-
private final int expect = 1;
- private int received;
private String oldUserHome = System.getProperty("user.home");
@Override
@@ -56,41 +49,32 @@ public class CamelSourceSshITCase extends AbstractKafkaTest {
return new String[] {"camel-ssh-kafka-connector"};
}
- @BeforeEach
+ @BeforeAll
public void setupKeyHome() {
System.setProperty("user.home", "target/user-home");
}
- @AfterEach
+ @AfterAll
public void tearDownKeyHome() {
System.setProperty("user.home", oldUserHome);
}
- private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-
- LOG.debug("Received: {}", record.value());
- received++;
+ @Override
+ protected void produceTestData() {
- return false;
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
-
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
-
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
assertEquals(received, expect, "Didn't process the expected amount of messages");
}
+
@Timeout(90)
@Test
public void testRetrieveFromSsh() throws ExecutionException, InterruptedException {
- String topic = TestUtils.getDefaultTestTopic(this.getClass());
+ String topic = getTopicForTest(this);
ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory
.basic()
@@ -105,7 +89,6 @@ public class CamelSourceSshITCase extends AbstractKafkaTest {
.withEntry("type", "org.apache.camel.kafkaconnector.ssh.transformers.SshTransforms")
.end();
-
- runTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topic, expect);
}
}