You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/12 15:43:46 UTC
[camel-kafka-connector] branch master updated: Avoid blocking
connector initialization on SSH tests as it leads to failures on GH actions
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 711d6c1 Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions
711d6c1 is described below
commit 711d6c17f38e74d3cd96fd7beb2336fd6910ec03
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:08:21 2021 +0100
Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions
---
.../common/test/CamelSinkTestSupport.java | 25 ++++++++++++++++++++++
.../ssh/sink/CamelSinkSshITCase.java | 2 +-
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index b414726..ec9e9dc 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -97,6 +97,31 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
verifyMessages(latch);
}
+ /**
+ * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
+ * @throws Exception For test-specific exceptions
+ */
+ protected void runTestNonBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception {
+ connectorPropertyFactory.log();
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+ LOG.debug("Creating the consumer ...");
+ ExecutorService service = Executors.newCachedThreadPool();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ service.submit(() -> consumeMessages(latch));
+
+ producer.produceMessages();
+
+ LOG.debug("Waiting for the messages to be processed");
+ service.shutdown();
+
+ LOG.debug("Waiting for the test to complete");
+ verifyMessages(latch);
+ }
+
protected boolean waitForData() {
try {
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 02f6f21..abfdccb 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -94,6 +94,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
.withUsername("root")
.withPassword("root");
- runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
+ runTestNonBlocking(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
}
}