You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/08 20:22:46 UTC

[camel-kafka-connector] branch master updated: Cleanup the AWS SQS Sink test

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 24a04c7  Cleanup the AWS SQS Sink test
     new efa78c7  Merge pull request #11 from orpiske/aws-sqs-test-cleanup
24a04c7 is described below

commit 24a04c70972558c75f56e27caa0c75a25d1a868c
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Dec 8 19:08:42 2019 +0100

    Cleanup the AWS SQS Sink test
    
    - Prevents the latch from trying to wait for longer than the test
    - Ensure the embedded runner instance terminates at the end of the test
---
 .../sink/aws/sqs/CamelSinkAWSSQSITCase.java        | 39 +++++++++++++---------
 1 file changed, 24 insertions(+), 15 deletions(-)

diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
index 9167f5d..ab6e7fa 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
@@ -99,7 +99,6 @@ public class CamelSinkAWSSQSITCase {
     }
 
 
-
     private void consumeMessages(CountDownLatch latch) {
         try {
             awssqsClient.receive(TestCommon.DEFAULT_SQS_QUEUE, this::checkMessages);
@@ -111,38 +110,48 @@ public class CamelSinkAWSSQSITCase {
         }
     }
 
+    private void produceMessages()  {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(kafka.getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                kafkaClient.produce(TestCommon.DEFAULT_TEST_TOPIC, "Sink test message " + i);
+            }
+        } catch (Throwable t) {
+            LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
+            fail(String.format("Unable to publish messages to the broker: {}", t.getMessage()));
+        }
+    }
+
 
     @Test(timeout = 90000)
     public void testBasicSendReceive() {
         try {
-            CountDownLatch latch = new CountDownLatch(1);
+            CountDownLatch latch = new CountDownLatch(2);
 
-            ExecutorService service = Executors.newFixedThreadPool(2);
-            service.submit(() -> kafkaConnectRunner.run());
+            ExecutorService service = Executors.newCachedThreadPool();
+            service.submit(() -> kafkaConnectRunner.run(latch));
 
             LOG.debug("Creating the consumer ...");
             service.submit(() -> consumeMessages(latch));
 
-            KafkaClient<String, String> kafkaClient = new KafkaClient<>(kafka.getBootstrapServers());
-
-            for (int i = 0; i < expect; i++) {
-                kafkaClient.produce(TestCommon.DEFAULT_TEST_TOPIC, "Sink test message " + i);
-            }
-
-            LOG.debug("Created the consumer ... About to receive messages");
+            LOG.debug("Creating the producer and sending messages ...");
+            produceMessages();
 
-            if (latch.await(120, TimeUnit.SECONDS)) {
+            LOG.debug("Waiting for the test to complete");
+            if (latch.await(80, TimeUnit.SECONDS)) {
                 Assert.assertTrue("Didn't process the expected amount of messages: " + received + " != " + expect,
                         received == expect);
             } else {
                 fail("Failed to receive the messages within the specified time");
             }
-
-            kafkaConnectRunner.stop();
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
+        } finally {
+            kafkaConnectRunner.stop();
         }
-
     }
+
+
 }