You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/07/22 03:39:49 UTC

[pulsar] branch master updated: [fix][test] Fix testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription (#16703)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b0149650838 [fix][test] Fix testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription (#16703)
b0149650838 is described below

commit b01496508388c73eacd25c51917524a81c55ab95
Author: Zike Yang <zi...@apache.org>
AuthorDate: Fri Jul 22 11:39:39 2022 +0800

    [fix][test] Fix testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription (#16703)
    
    ### Motivation
    
    The receiver queue size is too small. This causes the entry filter to trigger message redelivery more frequently. However, there will be a default interval of 1 second when delivering messages.  This tends to make the whole test take too long, leading to the timeout exception.
    
    ### Modifications
    
    * Increase the receiver queue size
    * Fix some problems in the test
---
 .../broker/service/plugin/FilterEntryTest.java     | 55 +++++++++++-----------
 1 file changed, 28 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index b2edbda8855..6d77d06da01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -229,6 +229,7 @@ public class FilterEntryTest extends BrokerTestBase {
         Map<String, String> metadataConsumer2 = new HashMap<>();
         metadataConsumer2.put("matchValueAccept", "FOR-2");
         metadataConsumer2.put("matchValueReschedule", "FOR-1");
+        final int numMessages = 200;
 
         try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .enableBatching(false).topic(topic).create();
@@ -236,7 +237,7 @@ public class FilterEntryTest extends BrokerTestBase {
                      .subscriptionType(SubscriptionType.Shared)
                      .properties(metadataConsumer1)
                      .consumerName("consumer1")
-                     .receiverQueueSize(5)
+                     .receiverQueueSize(numMessages / 2)
                      .subscriptionName(subName)
                      .subscribe();
              Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
@@ -244,7 +245,7 @@ public class FilterEntryTest extends BrokerTestBase {
                      .properties(metadataConsumer2)
                      .consumerName("consumer2")
                      .topic(topic)
-                     .receiverQueueSize(5)
+                     .receiverQueueSize(numMessages / 2)
                      .subscriptionName(subName)
                      .subscribe()) {
 
@@ -256,12 +257,13 @@ public class FilterEntryTest extends BrokerTestBase {
             field.setAccessible(true);
             NarClassLoader narClassLoader = mock(NarClassLoader.class);
             EntryFilter filter1 = new EntryFilterTest();
-            EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+            EntryFilterWithClassLoader loader1 =
+                    spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
             EntryFilter filter2 = new EntryFilterTest();
-            EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
+            EntryFilterWithClassLoader loader2 =
+                    spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
             field.set(dispatcher, ImmutableList.of(loader1, loader2));
 
-            int numMessages = 200;
             for (int i = 0; i < numMessages; i++) {
                 if (i % 2 == 0) {
                     producer.newMessage()
@@ -275,30 +277,29 @@ public class FilterEntryTest extends BrokerTestBase {
                             .send();
                 }
             }
-
             CompletableFuture<Void> resultConsume1 = new CompletableFuture<>();
             pulsar.getExecutor().submit(() -> {
-                        try {
-                            // assert that the consumer1 receive all the messages and that such messages
-                            // are for consumer1
-                            int counter = 0;
-                            while (counter < numMessages / 2) {
-                                Message<String> message = consumer1.receive(1, TimeUnit.MINUTES);
-                                if (message != null) {
-                                    log.info("received1 {} - {}", message.getValue(), message.getProperties());
-                                    counter++;
-                                    assertEquals("consumer-1", message.getValue());
-                                    consumer1.acknowledgeAsync(message);
-                                } else {
-                                    resultConsume1.completeExceptionally(
-                                            new Exception("consumer1 did not receive all the messages"));
-                                }
-                            }
-                            resultConsume1.complete(null);
-                        } catch (Throwable err) {
-                            resultConsume1.completeExceptionally(err);
+                try {
+                    // assert that the consumer1 receive all the messages and that such messages
+                    // are for consumer1
+                    int counter = 0;
+                    while (counter < numMessages / 2) {
+                        Message<String> message = consumer1.receive(1, TimeUnit.MINUTES);
+                        if (message != null) {
+                            log.info("received1 {} - {}", message.getValue(), message.getProperties());
+                            counter++;
+                            assertEquals("consumer-1", message.getValue());
+                            consumer1.acknowledgeAsync(message);
+                        } else {
+                            resultConsume1.completeExceptionally(
+                                    new Exception("consumer1 did not receive all the messages"));
                         }
-                    });
+                    }
+                    resultConsume1.complete(null);
+                } catch (Throwable err) {
+                    resultConsume1.completeExceptionally(err);
+                }
+            });
 
             CompletableFuture<Void> resultConsume2 = new CompletableFuture<>();
             pulsar.getExecutor().submit(() -> {
@@ -320,7 +321,7 @@ public class FilterEntryTest extends BrokerTestBase {
                     }
                     resultConsume2.complete(null);
                 } catch (Throwable err) {
-                    resultConsume1.completeExceptionally(err);
+                    resultConsume2.completeExceptionally(err);
                 }
             });
             resultConsume1.get(1, TimeUnit.MINUTES);