You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/08 20:22:46 UTC
[camel-kafka-connector] branch master updated: Cleanup the AWS SQS
Sink test
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 24a04c7 Cleanup the AWS SQS Sink test
new efa78c7 Merge pull request #11 from orpiske/aws-sqs-test-cleanup
24a04c7 is described below
commit 24a04c70972558c75f56e27caa0c75a25d1a868c
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Dec 8 19:08:42 2019 +0100
Cleanup the AWS SQS Sink test
- Prevents the latch from trying to wait for longer than the test
- Ensure the embedded runner instance terminates at the end of the test
---
.../sink/aws/sqs/CamelSinkAWSSQSITCase.java | 39 +++++++++++++---------
1 file changed, 24 insertions(+), 15 deletions(-)
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
index 9167f5d..ab6e7fa 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
@@ -99,7 +99,6 @@ public class CamelSinkAWSSQSITCase {
}
-
private void consumeMessages(CountDownLatch latch) {
try {
awssqsClient.receive(TestCommon.DEFAULT_SQS_QUEUE, this::checkMessages);
@@ -111,38 +110,48 @@ public class CamelSinkAWSSQSITCase {
}
}
+ private void produceMessages() {
+ try {
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(kafka.getBootstrapServers());
+
+ for (int i = 0; i < expect; i++) {
+ kafkaClient.produce(TestCommon.DEFAULT_TEST_TOPIC, "Sink test message " + i);
+ }
+ } catch (Throwable t) {
+ LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
+ fail(String.format("Unable to publish messages to the broker: {}", t.getMessage()));
+ }
+ }
+
@Test(timeout = 90000)
public void testBasicSendReceive() {
try {
- CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(2);
- ExecutorService service = Executors.newFixedThreadPool(2);
- service.submit(() -> kafkaConnectRunner.run());
+ ExecutorService service = Executors.newCachedThreadPool();
+ service.submit(() -> kafkaConnectRunner.run(latch));
LOG.debug("Creating the consumer ...");
service.submit(() -> consumeMessages(latch));
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(kafka.getBootstrapServers());
-
- for (int i = 0; i < expect; i++) {
- kafkaClient.produce(TestCommon.DEFAULT_TEST_TOPIC, "Sink test message " + i);
- }
-
- LOG.debug("Created the consumer ... About to receive messages");
+ LOG.debug("Creating the producer and sending messages ...");
+ produceMessages();
- if (latch.await(120, TimeUnit.SECONDS)) {
+ LOG.debug("Waiting for the test to complete");
+ if (latch.await(80, TimeUnit.SECONDS)) {
Assert.assertTrue("Didn't process the expected amount of messages: " + received + " != " + expect,
received == expect);
} else {
fail("Failed to receive the messages within the specified time");
}
-
- kafkaConnectRunner.stop();
} catch (Exception e) {
LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
+ } finally {
+ kafkaConnectRunner.stop();
}
-
}
+
+
}