You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2021/02/15 08:09:25 UTC
[camel] 02/11: Fix failing pulsar unit test
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5386cc0fa39d3930250480636374563f993c7c21
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Feb 5 09:16:20 2021 +0100
Fix failing pulsar unit test
---
.../pulsar/PulsarConsumerAcknowledgementTest.java | 73 ++++++++++++----------
.../pulsar/PulsarConsumerDeadLetterPolicyTest.java | 19 +++---
2 files changed, 53 insertions(+), 39 deletions(-)
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 3019e8c..f0c3eb5 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -18,9 +18,9 @@ package org.apache.camel.component.pulsar;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.pulsar.utils.AutoConfiguration;
@@ -43,27 +43,22 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.test.junit5.TestSupport.body;
-
public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
@RegisterExtension
static PulsarService service = PulsarServiceFactory.createService();
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerAcknowledgementTest.class);
- private static final String TOPIC_URI = "persistent://public/default/camel-topic-1";
+ private static final String TOPIC_URI = "persistent://public/default/camel-topic-";
- @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
- + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
- + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000"
- + "&negativeAckRedeliveryDelayMicros=100000")
private Endpoint from;
- @EndpointInject("mock:result")
private MockEndpoint to;
private Producer<String> producer;
+ private static int topicId = 0;
+
public String getPulsarBrokerUrl() {
return service.getPulsarBrokerUrl();
}
@@ -77,16 +72,25 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
context.removeRoute("myRoute");
String producerName = this.getClass().getSimpleName() + TestUtils.randomWithRange(1, 100);
- producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(TOPIC_URI).create();
+ String topicUri = PulsarConsumerAcknowledgementTest.TOPIC_URI + ++topicId;
+ producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(topicUri).create();
+
+ from = context.getEndpoint("pulsar:" + topicUri + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+ + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000"
+ + "&negativeAckRedeliveryDelayMicros=100000");
+ to = context.getEndpoint("mock:result", MockEndpoint.class);
}
@AfterEach
- public void tearDownProducer() {
+ public void tearDownProducer() throws Exception {
+ from.close();
try {
producer.close();
} catch (PulsarClientException e) {
LOGGER.warn("Failed to close client: {}", e.getMessage(), e);
}
+ context.removeRoute("myRoute");
}
@Override
@@ -115,12 +119,13 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
@Test
public void testAcknowledge() throws Exception {
- to.expectsNoDuplicates(body());
+ to.expectedMessageCount(1);
+ to.expectedBodiesReceived("testAcknowledge: Hello World!");
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
- from(from).routeId("myRoute").to(to).process(exchange -> {
+ from(from).routeId("testAcknowledge:myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
PulsarMessageReceipt receipt
@@ -130,19 +135,20 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
}
});
- producer.send("Hello World!");
+ producer.send("testAcknowledge: Hello World!");
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
}
@Test
public void testAcknowledgeAsync() throws Exception {
- to.expectsNoDuplicates(body());
+ to.expectedMessageCount(1);
+ to.expectedBodiesReceived("testAcknowledgeAsync: Hello World!");
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
- from(from).routeId("myRoute").to(to).process(exchange -> {
+ from(from).routeId("testAcknowledgeAsync:myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
PulsarMessageReceipt receipt
@@ -157,51 +163,54 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
}
});
- producer.send("Hello World!");
+ producer.send("testAcknowledgeAsync: Hello World!");
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
}
@Test
public void testAcknowledgeCumulative() throws Exception {
- to.expectsNoDuplicates(body());
+ to.expectedMessageCount(2);
+ to.expectedBodiesReceived("testAcknowledgeCumulative: Hello World!", "testAcknowledgeCumulative: Hello World Again!");
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
- from(from).routeId("myRoute").to(to).process(exchange -> {
+ from(from).routeId("testAcknowledgeCumulative:myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
PulsarMessageReceipt receipt
= (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
// Ack the second message. The first will also be acked.
- if (exchange.getIn().getBody().equals("Hello World Again!")) {
+ if (exchange.getIn().getBody().equals("testAcknowledgeCumulative: Hello World Again!")) {
receipt.acknowledgeCumulative();
}
});
}
});
- producer.send("Hello World!");
- producer.send("Hello World Again!");
+ producer.send("testAcknowledgeCumulative: Hello World!");
+ producer.send("testAcknowledgeCumulative: Hello World Again!");
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
}
@Test
public void testAcknowledgeCumulativeAsync() throws Exception {
- to.expectsNoDuplicates(body());
+ to.expectedMessageCount(2);
+ to.expectedBodiesReceived("testAcknowledgeCumulativeAsync: Hello World!",
+ "testAcknowledgeCumulativeAsync: Hello World Again!");
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
- from(from).routeId("myRoute").to(to).process(exchange -> {
+ from(from).routeId("testAcknowledgeCumulativeAsync:myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
PulsarMessageReceipt receipt
= (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
// Ack the second message. The first will also be acked.
- if (exchange.getIn().getBody().equals("Hello World Again!")) {
+ if (exchange.getIn().getBody().equals("testAcknowledgeCumulativeAsync: Hello World Again!")) {
try {
CompletableFuture<Void> f = receipt.acknowledgeCumulativeAsync();
f.get();
@@ -213,8 +222,8 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
}
});
- producer.send("Hello World!");
- producer.send("Hello World Again!");
+ producer.send("testAcknowledgeCumulativeAsync: Hello World!");
+ producer.send("testAcknowledgeCumulativeAsync: Hello World Again!");
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
}
@@ -222,16 +231,16 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
@Test
public void testNegativeAcknowledge() throws Exception {
to.expectedMessageCount(2);
- to.expectedBodiesReceived("Hello World!", "Hello World!");
+ to.expectedBodiesReceived("testNegativeAcknowledge: Hello World!", "testNegativeAcknowledge: Hello World!");
+ AtomicBoolean processed = new AtomicBoolean();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
- from(from).routeId("myRoute").to(to).process(exchange -> {
+ from(from).routeId("testNegativeAcknowledge:myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
- if (!Boolean.parseBoolean(exchange.getProperty("processedOnce", String.class))) {
- exchange.setProperty("processedOnce", "true");
+ if (processed.compareAndSet(false, true)) {
PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
.getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
receipt.negativeAcknowledge();
@@ -244,7 +253,7 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport {
}
});
- producer.newMessage().value("Hello World!").property("processedOnce", "false").send();
+ producer.newMessage().value("testNegativeAcknowledge: Hello World!").send();
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
index a5a7ad7..31dd28c 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
@@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerDeadLetterPolicyTest.class);
- private static final String TOPIC_URI = "persistent://public/default/camel-topic";
+ private static final String TOPIC_URI = "persistent://public/default/camel-topic-";
@EndpointInject("mock:result")
private MockEndpoint to;
@@ -51,6 +51,10 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
private Producer<String> producer;
+ private static int topicId = 0;
+
+ private String topicUri;
+
@Override
protected Registry createCamelRegistry() throws Exception {
Registry registry = new SimpleRegistry();
@@ -81,7 +85,8 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
}
String producerName = this.getClass().getSimpleName() + TestUtils.randomWithRange(1, 100);
- producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(TOPIC_URI).create();
+ topicUri = PulsarConsumerDeadLetterPolicyTest.TOPIC_URI + ++topicId;
+ producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(topicUri).create();
}
@AfterEach
@@ -97,7 +102,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
public void givenNoMaxRedeliverCountAndDeadLetterTopicverifyValuesAreNull() throws Exception {
PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
- PulsarEndpoint endpoint = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI);
+ PulsarEndpoint endpoint = (PulsarEndpoint) component.createEndpoint("pulsar:" + topicUri);
assertNull(endpoint.getPulsarConfiguration().getMaxRedeliverCount());
assertNull(endpoint.getPulsarConfiguration().getDeadLetterTopic());
@@ -109,9 +114,9 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
PulsarEndpoint from = (PulsarEndpoint) component
- .createEndpoint("pulsar:" + TOPIC_URI
+ .createEndpoint("pulsar:" + topicUri
+ "?maxRedeliverCount=5&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
- PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI + "-subs-DLQ");
+ PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:" + topicUri + "-subs-DLQ");
to.expectedMessageCount(5);
deadLetter.expectedMessageCount(1);
@@ -134,7 +139,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
PulsarEndpoint from = (PulsarEndpoint) component
- .createEndpoint("pulsar:" + TOPIC_URI
+ .createEndpoint("pulsar:" + topicUri
+ "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
PulsarEndpoint deadLetterFrom
= (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic");
@@ -159,7 +164,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
PulsarEndpoint from = (PulsarEndpoint) component
- .createEndpoint("pulsar:" + TOPIC_URI
+ .createEndpoint("pulsar:" + topicUri
+ "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
PulsarEndpoint deadLetterFrom
= (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic");