You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/07/22 03:39:49 UTC
[pulsar] branch master updated: [fix][test] Fix testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription (#16703)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b0149650838 [fix][test] Fix testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription (#16703)
b0149650838 is described below
commit b01496508388c73eacd25c51917524a81c55ab95
Author: Zike Yang <zi...@apache.org>
AuthorDate: Fri Jul 22 11:39:39 2022 +0800
[fix][test] Fix testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription (#16703)
### Motivation
The receiver queue size is too small. This causes the entry filter to trigger message redelivery more frequently. However, there will be a default interval of 1 second when delivering messages. This tends to make the whole test take too long, leading to the timeout exception.
### Modifications
* Increase the receiver queue size
* Fix some problems in the test
---
.../broker/service/plugin/FilterEntryTest.java | 55 +++++++++++-----------
1 file changed, 28 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index b2edbda8855..6d77d06da01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -229,6 +229,7 @@ public class FilterEntryTest extends BrokerTestBase {
Map<String, String> metadataConsumer2 = new HashMap<>();
metadataConsumer2.put("matchValueAccept", "FOR-2");
metadataConsumer2.put("matchValueReschedule", "FOR-1");
+ final int numMessages = 200;
try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topic).create();
@@ -236,7 +237,7 @@ public class FilterEntryTest extends BrokerTestBase {
.subscriptionType(SubscriptionType.Shared)
.properties(metadataConsumer1)
.consumerName("consumer1")
- .receiverQueueSize(5)
+ .receiverQueueSize(numMessages / 2)
.subscriptionName(subName)
.subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
@@ -244,7 +245,7 @@ public class FilterEntryTest extends BrokerTestBase {
.properties(metadataConsumer2)
.consumerName("consumer2")
.topic(topic)
- .receiverQueueSize(5)
+ .receiverQueueSize(numMessages / 2)
.subscriptionName(subName)
.subscribe()) {
@@ -256,12 +257,13 @@ public class FilterEntryTest extends BrokerTestBase {
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
- EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+ EntryFilterWithClassLoader loader1 =
+ spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
EntryFilter filter2 = new EntryFilterTest();
- EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
+ EntryFilterWithClassLoader loader2 =
+ spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
field.set(dispatcher, ImmutableList.of(loader1, loader2));
- int numMessages = 200;
for (int i = 0; i < numMessages; i++) {
if (i % 2 == 0) {
producer.newMessage()
@@ -275,30 +277,29 @@ public class FilterEntryTest extends BrokerTestBase {
.send();
}
}
-
CompletableFuture<Void> resultConsume1 = new CompletableFuture<>();
pulsar.getExecutor().submit(() -> {
- try {
- // assert that the consumer1 receive all the messages and that such messages
- // are for consumer1
- int counter = 0;
- while (counter < numMessages / 2) {
- Message<String> message = consumer1.receive(1, TimeUnit.MINUTES);
- if (message != null) {
- log.info("received1 {} - {}", message.getValue(), message.getProperties());
- counter++;
- assertEquals("consumer-1", message.getValue());
- consumer1.acknowledgeAsync(message);
- } else {
- resultConsume1.completeExceptionally(
- new Exception("consumer1 did not receive all the messages"));
- }
- }
- resultConsume1.complete(null);
- } catch (Throwable err) {
- resultConsume1.completeExceptionally(err);
+ try {
+ // assert that the consumer1 receive all the messages and that such messages
+ // are for consumer1
+ int counter = 0;
+ while (counter < numMessages / 2) {
+ Message<String> message = consumer1.receive(1, TimeUnit.MINUTES);
+ if (message != null) {
+ log.info("received1 {} - {}", message.getValue(), message.getProperties());
+ counter++;
+ assertEquals("consumer-1", message.getValue());
+ consumer1.acknowledgeAsync(message);
+ } else {
+ resultConsume1.completeExceptionally(
+ new Exception("consumer1 did not receive all the messages"));
}
- });
+ }
+ resultConsume1.complete(null);
+ } catch (Throwable err) {
+ resultConsume1.completeExceptionally(err);
+ }
+ });
CompletableFuture<Void> resultConsume2 = new CompletableFuture<>();
pulsar.getExecutor().submit(() -> {
@@ -320,7 +321,7 @@ public class FilterEntryTest extends BrokerTestBase {
}
resultConsume2.complete(null);
} catch (Throwable err) {
- resultConsume1.completeExceptionally(err);
+ resultConsume2.completeExceptionally(err);
}
});
resultConsume1.get(1, TimeUnit.MINUTES);