You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2021/02/15 08:09:25 UTC

[camel] 02/11: Fix failing pulsar unit test

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5386cc0fa39d3930250480636374563f993c7c21
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Feb 5 09:16:20 2021 +0100

    Fix failing pulsar unit test
---
 .../pulsar/PulsarConsumerAcknowledgementTest.java  | 73 ++++++++++++----------
 .../pulsar/PulsarConsumerDeadLetterPolicyTest.java | 19 +++---
 2 files changed, 53 insertions(+), 39 deletions(-)

diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 3019e8c..f0c3eb5 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -18,9 +18,9 @@ package org.apache.camel.component.pulsar;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.pulsar.utils.AutoConfiguration;
@@ -43,27 +43,22 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.test.junit5.TestSupport.body;
-
 public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
 
     @RegisterExtension
     static PulsarService service = PulsarServiceFactory.createService();
 
     private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerAcknowledgementTest.class);
-    private static final String TOPIC_URI = "persistent://public/default/camel-topic-1";
+    private static final String TOPIC_URI = "persistent://public/default/camel-topic-";
 
-    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000"
-                    + "&negativeAckRedeliveryDelayMicros=100000")
     private Endpoint from;
 
-    @EndpointInject("mock:result")
     private MockEndpoint to;
 
     private Producer<String> producer;
 
+    private static int topicId = 0;
+
     public String getPulsarBrokerUrl() {
         return service.getPulsarBrokerUrl();
     }
@@ -77,16 +72,25 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
         context.removeRoute("myRoute");
         String producerName = this.getClass().getSimpleName() + TestUtils.randomWithRange(1, 100);
 
-        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(TOPIC_URI).create();
+        String topicUri = PulsarConsumerAcknowledgementTest.TOPIC_URI + ++topicId;
+        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(topicUri).create();
+
+        from = context.getEndpoint("pulsar:" + topicUri + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                                   + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+                                   + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000"
+                                   + "&negativeAckRedeliveryDelayMicros=100000");
+        to = context.getEndpoint("mock:result", MockEndpoint.class);
     }
 
     @AfterEach
-    public void tearDownProducer() {
+    public void tearDownProducer() throws Exception {
+        from.close();
         try {
             producer.close();
         } catch (PulsarClientException e) {
             LOGGER.warn("Failed to close client: {}", e.getMessage(), e);
         }
+        context.removeRoute("myRoute");
     }
 
     @Override
@@ -115,12 +119,13 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
 
     @Test
     public void testAcknowledge() throws Exception {
-        to.expectsNoDuplicates(body());
+        to.expectedMessageCount(1);
+        to.expectedBodiesReceived("testAcknowledge: Hello World!");
 
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() {
-                from(from).routeId("myRoute").to(to).process(exchange -> {
+                from(from).routeId("testAcknowledge:myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
                     PulsarMessageReceipt receipt
@@ -130,19 +135,20 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
             }
         });
 
-        producer.send("Hello World!");
+        producer.send("testAcknowledge: Hello World!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
     }
 
     @Test
     public void testAcknowledgeAsync() throws Exception {
-        to.expectsNoDuplicates(body());
+        to.expectedMessageCount(1);
+        to.expectedBodiesReceived("testAcknowledgeAsync: Hello World!");
 
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() {
-                from(from).routeId("myRoute").to(to).process(exchange -> {
+                from(from).routeId("testAcknowledgeAsync:myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
                     PulsarMessageReceipt receipt
@@ -157,51 +163,54 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
             }
         });
 
-        producer.send("Hello World!");
+        producer.send("testAcknowledgeAsync: Hello World!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
     }
 
     @Test
     public void testAcknowledgeCumulative() throws Exception {
-        to.expectsNoDuplicates(body());
+        to.expectedMessageCount(2);
+        to.expectedBodiesReceived("testAcknowledgeCumulative: Hello World!", "testAcknowledgeCumulative: Hello World Again!");
 
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() {
-                from(from).routeId("myRoute").to(to).process(exchange -> {
+                from(from).routeId("testAcknowledgeCumulative:myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
                     PulsarMessageReceipt receipt
                             = (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     // Ack the second message. The first will also be acked.
-                    if (exchange.getIn().getBody().equals("Hello World Again!")) {
+                    if (exchange.getIn().getBody().equals("testAcknowledgeCumulative: Hello World Again!")) {
                         receipt.acknowledgeCumulative();
                     }
                 });
             }
         });
 
-        producer.send("Hello World!");
-        producer.send("Hello World Again!");
+        producer.send("testAcknowledgeCumulative: Hello World!");
+        producer.send("testAcknowledgeCumulative: Hello World Again!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
     }
 
     @Test
     public void testAcknowledgeCumulativeAsync() throws Exception {
-        to.expectsNoDuplicates(body());
+        to.expectedMessageCount(2);
+        to.expectedBodiesReceived("testAcknowledgeCumulativeAsync: Hello World!",
+                "testAcknowledgeCumulativeAsync: Hello World Again!");
 
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() {
-                from(from).routeId("myRoute").to(to).process(exchange -> {
+                from(from).routeId("testAcknowledgeCumulativeAsync:myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
                     PulsarMessageReceipt receipt
                             = (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     // Ack the second message. The first will also be acked.
-                    if (exchange.getIn().getBody().equals("Hello World Again!")) {
+                    if (exchange.getIn().getBody().equals("testAcknowledgeCumulativeAsync: Hello World Again!")) {
                         try {
                             CompletableFuture<Void> f = receipt.acknowledgeCumulativeAsync();
                             f.get();
@@ -213,8 +222,8 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
             }
         });
 
-        producer.send("Hello World!");
-        producer.send("Hello World Again!");
+        producer.send("testAcknowledgeCumulativeAsync: Hello World!");
+        producer.send("testAcknowledgeCumulativeAsync: Hello World Again!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
     }
@@ -222,16 +231,16 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
     @Test
     public void testNegativeAcknowledge() throws Exception {
         to.expectedMessageCount(2);
-        to.expectedBodiesReceived("Hello World!", "Hello World!");
+        to.expectedBodiesReceived("testNegativeAcknowledge: Hello World!", "testNegativeAcknowledge: Hello World!");
 
+        AtomicBoolean processed = new AtomicBoolean();
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() {
-                from(from).routeId("myRoute").to(to).process(exchange -> {
+                from(from).routeId("testNegativeAcknowledge:myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    if (!Boolean.parseBoolean(exchange.getProperty("processedOnce", String.class))) {
-                        exchange.setProperty("processedOnce", "true");
+                    if (processed.compareAndSet(false, true)) {
                         PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
                                 .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                         receipt.negativeAcknowledge();
@@ -244,7 +253,7 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
             }
         });
 
-        producer.newMessage().value("Hello World!").property("processedOnce", "false").send();
+        producer.newMessage().value("testNegativeAcknowledge: Hello World!").send();
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
     }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
index a5a7ad7..31dd28c 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
@@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
     private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerDeadLetterPolicyTest.class);
-    private static final String TOPIC_URI = "persistent://public/default/camel-topic";
+    private static final String TOPIC_URI = "persistent://public/default/camel-topic-";
 
     @EndpointInject("mock:result")
     private MockEndpoint to;
@@ -51,6 +51,10 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
 
     private Producer<String> producer;
 
+    private static int topicId = 0;
+
+    private String topicUri;
+
     @Override
     protected Registry createCamelRegistry() throws Exception {
         Registry registry = new SimpleRegistry();
@@ -81,7 +85,8 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
         }
         String producerName = this.getClass().getSimpleName() + TestUtils.randomWithRange(1, 100);
 
-        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(TOPIC_URI).create();
+        topicUri = PulsarConsumerDeadLetterPolicyTest.TOPIC_URI + ++topicId;
+        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(topicUri).create();
     }
 
     @AfterEach
@@ -97,7 +102,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
     public void givenNoMaxRedeliverCountAndDeadLetterTopicverifyValuesAreNull() throws Exception {
         PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
 
-        PulsarEndpoint endpoint = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI);
+        PulsarEndpoint endpoint = (PulsarEndpoint) component.createEndpoint("pulsar:" + topicUri);
 
         assertNull(endpoint.getPulsarConfiguration().getMaxRedeliverCount());
         assertNull(endpoint.getPulsarConfiguration().getDeadLetterTopic());
@@ -109,9 +114,9 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
         PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
 
         PulsarEndpoint from = (PulsarEndpoint) component
-                .createEndpoint("pulsar:" + TOPIC_URI
+                .createEndpoint("pulsar:" + topicUri
                                 + "?maxRedeliverCount=5&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
-        PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI + "-subs-DLQ");
+        PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:" + topicUri + "-subs-DLQ");
 
         to.expectedMessageCount(5);
         deadLetter.expectedMessageCount(1);
@@ -134,7 +139,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
         PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
 
         PulsarEndpoint from = (PulsarEndpoint) component
-                .createEndpoint("pulsar:" + TOPIC_URI
+                .createEndpoint("pulsar:" + topicUri
                                 + "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
         PulsarEndpoint deadLetterFrom
                 = (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic");
@@ -159,7 +164,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
         PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
 
         PulsarEndpoint from = (PulsarEndpoint) component
-                .createEndpoint("pulsar:" + TOPIC_URI
+                .createEndpoint("pulsar:" + topicUri
                                 + "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
         PulsarEndpoint deadLetterFrom
                 = (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic");