You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/12 15:43:46 UTC

[camel-kafka-connector] branch master updated: Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 711d6c1  Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions
711d6c1 is described below

commit 711d6c17f38e74d3cd96fd7beb2336fd6910ec03
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:08:21 2021 +0100

    Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions
---
 .../common/test/CamelSinkTestSupport.java          | 25 ++++++++++++++++++++++
 .../ssh/sink/CamelSinkSshITCase.java               |  2 +-
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index b414726..ec9e9dc 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -97,6 +97,31 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
         verifyMessages(latch);
     }
 
+    /**
+     * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @throws Exception For test-specific exceptions
+     */
+    protected void runTestNonBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        LOG.debug("Creating the consumer ...");
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        service.submit(() -> consumeMessages(latch));
+
+        producer.produceMessages();
+
+        LOG.debug("Waiting for the messages to be processed");
+        service.shutdown();
+
+        LOG.debug("Waiting for the test to complete");
+        verifyMessages(latch);
+    }
+
 
     protected boolean waitForData() {
         try {
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 02f6f21..abfdccb 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -94,6 +94,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
                 .withUsername("root")
                 .withPassword("root");
 
-        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
+        runTestNonBlocking(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
     }
 }