You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:13:00 UTC
[camel-kafka-connector] 17/22: Convert the SSH tests to the new
reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f3cd0607448e328cc9b2121258759721dd1b19a6
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:51:28 2021 +0100
Convert the SSH tests to the new reusable sink test base class
---
.../ssh/sink/CamelSinkSshITCase.java | 67 +++++++++++-----------
1 file changed, 32 insertions(+), 35 deletions(-)
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index cf7e9dd..1c71719 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -17,18 +17,16 @@
package org.apache.camel.kafkaconnector.ssh.sink;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.ssh.services.SshService;
import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
@@ -40,46 +38,42 @@ import static org.junit.jupiter.api.Assertions.fail;
@DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
disabledReason = "Hangs when running with the embedded Kafka Connect instance")
-public class CamelSinkSshITCase extends AbstractKafkaTest {
+public class CamelSinkSshITCase extends CamelSinkTestSupport {
@RegisterExtension
public static SshService sshService = SshServiceFactory.createService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class);
private final int expect = 3;
+ private String topic;
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-ssh-kafka-connector"};
}
- private void putRecords(CountDownLatch latch) {
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
- try {
- for (int i = 0; i < expect; i++) {
- try {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date");
- } catch (ExecutionException e) {
- LOG.error("Unable to produce messages: {}", e.getMessage(), e);
- } catch (InterruptedException e) {
- break;
- }
- }
- } finally {
- latch.countDown();
- }
+ @BeforeEach
+ public void setUp() {
+ topic = TestUtils.getDefaultTestTopic(this.getClass());
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
+ @Override
+ protected String testMessageContent(int current) {
+ return "date";
+ }
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService service = Executors.newCachedThreadPool();
- service.submit(() -> putRecords(latch));
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ latch.countDown();
+ }
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (!latch.await(30, TimeUnit.SECONDS)) {
fail("Timed out wait for data to be added to the Kafka cluster");
}
@@ -87,12 +81,15 @@ public class CamelSinkSshITCase extends AbstractKafkaTest {
@Timeout(90)
@Test
- public void testSshCommand() throws ExecutionException, InterruptedException {
- String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
- ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost())
- .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root");
-
- runTest(connectorPropertyFactory);
+ public void testSshCommand() throws Exception {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory
+ .basic()
+ .withTopics(topic)
+ .withHost(sshService.getSshHost())
+ .withPort(Integer.toString(sshService.getSshPort()))
+ .withUsername("root")
+ .withPassword("root");
+
+ runTest(connectorPropertyFactory, topic, expect);
}
}