You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/23 11:38:00 UTC

[GitHub] [pulsar] mattisonchao opened a new pull request, #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

mattisonchao opened a new pull request, #17237:
URL: https://github.com/apache/pulsar/pull/17237

   ### Motivation
   
   When invoking `readMoreEntries` we have to check if the `havePendingRead` state is `true` to avoid reading the same position many times in the race condition. 
   
   You can add `invocationCount` on the `Test` annotation to call the new test multiple times to understand the problem.
   
   The relative logs:
   
   ```
   2022-08-23T19:10:01,016 - INFO  - [pulsar-web-191-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [23/Aug/2022:19:10:01 +0800] "GET /admin/v2/namespaces/my-property/throttling_ns/subscriptionDispatchRate HTTP/1.1" 200 124 "-" "Pulsar-Java-v2.11.0-SNAPSHOT" 2
   2022-08-23T19:10:01,019 - INFO  - [pulsar-web-191-14:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [23/Aug/2022:19:10:01 +0800] "GET /admin/v2/namespaces/my-property/throttling_ns/dispatchRate HTTP/1.1" 200 123 "-" "Pulsar-Java-v2.11.0-SNAPSHOT" 2
   2022-08-23T19:10:01,021 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:01,021 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:0
   2022-08-23T19:10:01,021 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:01,023 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [0] in the listener
   2022-08-23T19:10:01,036 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:01,036 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:1
   2022-08-23T19:10:01,036 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:2
   2022-08-23T19:10:01,036 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:3
   2022-08-23T19:10:01,036 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:4
   2022-08-23T19:10:01,036 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:5
   2022-08-23T19:10:01,036 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:01,037 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [1] in the listener
   2022-08-23T19:10:01,037 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [2] in the listener
   2022-08-23T19:10:01,037 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [3] in the listener
   2022-08-23T19:10:01,037 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [4] in the listener
   2022-08-23T19:10:01,037 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [5] in the listener
   2022-08-23T19:10:01,037 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:01,037 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:6
   2022-08-23T19:10:01,037 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:01,038 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:01,038 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:7
   2022-08-23T19:10:01,038 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:01,038 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [6] in the listener
   2022-08-23T19:10:01,038 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [7] in the listener
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:8
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:9
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:10
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:11
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:12
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:13
   2022-08-23T19:10:02,039 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:14
   2022-08-23T19:10:02,040 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:15
   2022-08-23T19:10:02,040 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:16
   2022-08-23T19:10:02,040 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:17
   2022-08-23T19:10:02,040 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [8] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [9] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [10] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [11] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [12] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [13] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [14] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [15] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [16] in the listener
   2022-08-23T19:10:02,041 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [17] in the listener
   2022-08-23T19:10:03,042 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:03,042 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:18
   2022-08-23T19:10:03,042 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:03,043 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:03,043 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:18
   2022-08-23T19:10:03,043 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:03,043 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:03,043 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:19
   2022-08-23T19:10:03,043 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [18] in the listener
   2022-08-23T19:10:03,044 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:03,044 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [18] in the listener
   2022-08-23T19:10:03,044 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [19] in the listener
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:20
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:21
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:22
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:23
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:20
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:21
   2022-08-23T19:10:04,045 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:22
   2022-08-23T19:10:04,046 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:23
   2022-08-23T19:10:04,046 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:04,046 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [20] in the listener
   2022-08-23T19:10:04,047 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [21] in the listener
   2022-08-23T19:10:04,047 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [22] in the listener
   2022-08-23T19:10:04,047 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [23] in the listener
   2022-08-23T19:10:04,047 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [20] in the listener
   2022-08-23T19:10:04,047 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [21] in the listener
   2022-08-23T19:10:04,047 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [22] in the listener
   2022-08-23T19:10:04,047 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [23] in the listener
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:24
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:25
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:26
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:24
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:25
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:26
   2022-08-23T19:10:05,048 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:05,049 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [24] in the listener
   2022-08-23T19:10:05,049 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [25] in the listener
   2022-08-23T19:10:05,049 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [26] in the listener
   2022-08-23T19:10:05,049 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [24] in the listener
   2022-08-23T19:10:05,049 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [25] in the listener
   2022-08-23T19:10:05,050 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [26] in the listener
   2022-08-23T19:10:05,659 - INFO  - [pulsar-load-manager-163-1:ModularLoadManagerImpl@466] - Writing local data to metadata store because maximum change 55.46875% exceeded threshold 10%; time since last report written is 5.02 seconds. ResourceUsage:[cpu: 11.06%, memory: 55.47%, directMemory: 9.38%, bandwidthIn: 0.00%, bandwidthOut: 0.00%]
   2022-08-23T19:10:05,661 - INFO  - [metadata-store-171-1:ResourceLockImpl@165] - Acquired resource lock on /loadbalance/brokers/localhost:52221
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:27
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:28
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:29
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@162] - ============ before dispatch
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:27
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:28
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@164] - dispatch message to consumer ledger:3, entry:29
   2022-08-23T19:10:06,053 - INFO  - [broker-topic-workers-OrderedExecutor-7-0:PersistentDispatcherSingleActiveConsumer@166] - ============ after dispatch
   2022-08-23T19:10:06,056 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [27] in the listener
   2022-08-23T19:10:06,056 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [28] in the listener
   2022-08-23T19:10:06,056 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [29] in the listener
   2022-08-23T19:10:06,056 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [27] in the listener
   2022-08-23T19:10:06,056 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [28] in the listener
   2022-08-23T19:10:06,056 - INFO  - [pulsar-external-listener-197-1:SubscriptionMessageDispatchThrottlingTest@346] - Received message [29] in the listener
   
   java.lang.AssertionError: 
   Expected :30.0
   Actual   :41.0
   <Click to see difference>
   
   
   	at org.testng.Assert.fail(Assert.java:99)
   	at org.testng.Assert.failNotEquals(Assert.java:1037)
   	at org.testng.Assert.assertEquals(Assert.java:744)
   	at org.testng.Assert.assertEquals(Assert.java:757)
   	at org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest.testDispatchRate(SubscriptionMessageDispatchThrottlingTest.java:390)
   	at org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest.testMultiLevelDispatch(SubscriptionMessageDispatchThrottlingTest.java:420)
   	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
   	at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
   	at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
   	at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
   	at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
   	at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
   	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
   	at org.testng.TestRunner.privateRun(TestRunner.java:764)
   	at org.testng.TestRunner.run(TestRunner.java:585)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
   	at org.testng.TestNG.runSuites(TestNG.java:1069)
   	at org.testng.TestNG.run(TestNG.java:1037)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
   ```
   
   ### Modifications
   
   - Add `synchronized` keyword and `havePendingRead` state to avoid read same position many times.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1225282433

   After the test, I think the problem was caused by we probably invoking `reScheduleRead` many times. and the `readMoreEntires` also don't have `havePendingRead ` guard. 
   So, we may read the same position many times. 
   
   Even though I fix the `reSheduledRead` many times, I still think we need to add `havePendingRead` guard at `readMoreEntires` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r952765957


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }

Review Comment:
   As the comment said `Asynchronously produce messages`, but actually using the sync method.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);

Review Comment:
   Does the BUG only happen after the dispatch rate limit is enabled?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)

Review Comment:
   @mattisonchao If the test can't reproduce the issue stable, we can also add the invocationCount to make the test can get passed multiple times on the CI environment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r955786926


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

Review Comment:
   This is flaky. since the comment is "please alert", I'm alerting. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r953408673


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -375,19 +385,16 @@ protected void readMoreEntries(Consumer consumer) {
 
     @Override
     protected void reScheduleRead() {
-        topic.getBrokerService().executor().schedule(() -> {
-            Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-            if (currentConsumer != null && !havePendingRead) {
-                readMoreEntries(currentConsumer);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
-                                    + " havePendingRead {}",
-                            topic.getName(), currentConsumer, havePendingRead);
-                }
+        if (isRescheduleReadInProgress.compareAndSet(false, true)) {

Review Comment:
   We have to avoid `reScheduleRead` many times, see #16241 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1224189584

   ```
   2022-08-23T22:44:06,869+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:0
   2022-08-23T22:44:06,869+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:06,882+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:1
   2022-08-23T22:44:06,882+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:2
   2022-08-23T22:44:06,882+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:3
   2022-08-23T22:44:06,882+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:4
   2022-08-23T22:44:06,882+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:5
   2022-08-23T22:44:06,882+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:6
   2022-08-23T22:44:06,882+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:06,883+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:7
   2022-08-23T22:44:06,883+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:07,884+0800 [pulsar-io-766-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:07,885+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:8
   2022-08-23T22:44:07,885+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:9
   2022-08-23T22:44:07,885+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:10
   2022-08-23T22:44:07,885+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:11
   2022-08-23T22:44:07,885+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:12
   2022-08-23T22:44:07,885+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:13
   2022-08-23T22:44:07,885+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:07,886+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:14
   2022-08-23T22:44:07,886+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:07,886+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= internalConsumerFlow ===========
   2022-08-23T22:44:07,886+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= internalConsumerFlow ===========
   2022-08-23T22:44:08,887+0800 [pulsar-io-766-5] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:08,887+0800 [pulsar-io-766-4] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:15
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:16
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:17
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:18
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:19
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:20
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:15
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:16
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:17
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:18
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:19
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:20
   2022-08-23T22:44:08,888+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:08,889+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:08,889+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= internalConsumerFlow ===========
   2022-08-23T22:44:08,889+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= internalConsumerFlow ===========
   2022-08-23T22:44:09,890+0800 [pulsar-io-766-6] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:09,890+0800 [pulsar-io-766-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:09,890+0800 [pulsar-io-766-8] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:09,890+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:21
   2022-08-23T22:44:09,890+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:21
   2022-08-23T22:44:09,890+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:21
   2022-08-23T22:44:09,890+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:09,890+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:22
   2022-08-23T22:44:09,890+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:09,890+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:09,891+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= internalConsumerFlow ===========
   2022-08-23T22:44:10,891+0800 [pulsar-io-766-10] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:10,891+0800 [pulsar-io-766-11] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:10,891+0800 [pulsar-io-766-9] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:23
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:24
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:25
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:26
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:23
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:24
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:25
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:26
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:23
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:24
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:25
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - dispatch message ledger:3 entry:26
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:10,892+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:10,893+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= Dispatcher complete ===========
   2022-08-23T22:44:10,893+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= internalConsumerFlow ===========
   2022-08-23T22:44:10,893+0800 [broker-topic-workers-OrderedExecutor-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= internalConsumerFlow ===========
   2022-08-23T22:44:11,556+0800 [pulsar-load-manager-763-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Writing local data to metadata store because maximum change 19.620104134082794% exceeded threshold 10%; time since last report written is 5.019 seconds
   2022-08-23T22:44:11,560+0800 [metadata-store-771-1] INFO  org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl - Acquired resource lock on /loadbalance/brokers/localhost:56274
   2022-08-23T22:44:11,893+0800 [pulsar-io-766-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:11,894+0800 [pulsar-io-766-13] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   2022-08-23T22:44:11,894+0800 [pulsar-io-766-14] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - ============= reScheduleRead ===========
   
   java.lang.AssertionError: 
   Expected :30.0
   Actual   :43.0
   <Click to see difference>
   
   
   	at org.testng.Assert.fail(Assert.java:99)
   	at org.testng.Assert.failNotEquals(Assert.java:1037)
   	at org.testng.Assert.assertEquals(Assert.java:744)
   	at org.testng.Assert.assertEquals(Assert.java:757)
   	at org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest.testMessageNotDuplicated(SubscriptionMessageDispatchThrottlingTest.java:216)
   	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
   	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
   	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
   	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
   	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
   	at java.base/java.lang.Thread.run(Thread.java:833)
   ```
   From this log, we can know `internalConsumerFlow` and `reScheduleRead` will invoke many times concurrently. and the `reScheduleRead` may have some problems cause we register many scheduled tasks on it. I will try fix it at another PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #17237:
URL: https://github.com/apache/pulsar/pull/17237


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r955788801


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

Review Comment:
   I was running it locally. 
   ```
   java.lang.AssertionError: 
   Expected :30.0
   Actual   :44.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r953274502


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);

Review Comment:
   I tested it, it should be



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] syhily commented on pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
syhily commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1237188920

   We meet same issue here on flink-connector-pulsar.
   
   ```
   Sep 05 05:19:03 [ERROR] org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReaderTest.consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition(PulsarPartitionSplitReaderBase)[1]  Time elapsed: 30.569 s  <<< FAILURE!
   Sep 05 05:19:03 java.lang.AssertionError: 
   Sep 05 05:19:03 [We should fetch the expected size] 
   Sep 05 05:19:03 Expected size: 20 but was: 23 in:
   Sep 05 05:19:03 [PulsarMessage{id=148:0:0, value=xtvCoENDyD, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:1:0, value=LsfmAdOvPi, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:2:0, value=SADrPXpnsp, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:0:0, value=xtvCoENDyD, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:1:0, value=LsfmAdOvPi, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:2:0, value=SADrPXpnsp, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:3:0, value=BsEGjUZSJN, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:4:0, value=wvteCkUkjX, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:5:0, value=HUEpGPmjYy, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:6:0, value=kndeIFjLLK, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:7:0, value=gOmuNzCqbL, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:8:0, value=fvtNHyyqqj, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:9:0, value=iKrmISirqy, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:10:0, value=lQUCdQizRw, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:11:0, value=RkGRkfTAcS, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:12:0, value=mBcAGPkmpY, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:13:0, value=AMUWkVFpwU, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:14:0, value=zCtbxzylrl, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:15:0, value=OQNrSEYkFo, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:16:0, value=oLuIyaJZrI, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:17:0, value=PfbbxVuQKh, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:18:0, value=KeIgSeYDxM, eventTime=0},
   Sep 05 05:19:03     PulsarMessage{id=148:19:0, value=KZkeATFOln, eventTime=0}]
   Sep 05 05:19:03 	at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderTestBase.fetchedMessages(PulsarPartitionSplitReaderTestBase.java:199)
   Sep 05 05:19:03 	at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderTestBase.fetchedMessages(PulsarPartitionSplitReaderTestBase.java:169)
   Sep 05 05:19:03 	at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderTestBase.consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition(PulsarPartitionSplitReaderTestBase.java:260)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r953405969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -332,12 +332,17 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl
     }
 
     @Override
-    protected void readMoreEntries(Consumer consumer) {
+    protected synchronized void readMoreEntries(Consumer consumer) {

Review Comment:
   Modified the scope of the lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r953397765


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -332,12 +332,17 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl
     }
 
     @Override
-    protected void readMoreEntries(Consumer consumer) {
+    protected synchronized void readMoreEntries(Consumer consumer) {
         // consumer can be null when all consumers are disconnected from broker.
         // so skip reading more entries if currently there is no active consumer.
         if (null == consumer) {
             return;
         }
+        if (havePendingRead) {

Review Comment:
   I think we should move the `havePendingRead` check to the `readMoreEntries ` method to avoid judgement everywhere.
   Except for some judgments that have logical implications, I have changed them all.
   Please take a look.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1224158414

   Because branch-2.10 also has this problem. I don't think it's a release 2.11 blocker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r955801428


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

Review Comment:
   I will try to verify it. Maybe I lost something. very thanks to you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1224226493

   @mattisonchao Do you know which PR introduced this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r953407731


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -336,25 +331,40 @@ protected void readMoreEntries(Consumer consumer) {
         // consumer can be null when all consumers are disconnected from broker.
         // so skip reading more entries if currently there is no active consumer.
         if (null == consumer) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
+            }
+            return;
+        }
+        if (havePendingRead) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
+            }
             return;
         }
 
         if (consumer.getAvailablePermits() > 0) {
-            Pair<Integer, Long> calculateResult = calculateToRead(consumer);
-            int messagesToRead = calculateResult.getLeft();
-            long bytesToRead = calculateResult.getRight();
+            synchronized (this) {

Review Comment:
   Move lock here to avoid invoking useless `calculateToRead` many times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1225285624

   When I wanted to fix the unstable test "SubscriptionMessageDispatchThrottlingTest", I found that one time we received more messages than we sent. So, I added `Thread.sleep()` to verify it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r953249195


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r952509663


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)

Review Comment:
   You can add invocationCount on the Test annotation to call the new test multiple times to understand the problem.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)

Review Comment:
   You can add `invocationCount` on the Test annotation to call the new test multiple times to understand the problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r955855495


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

Review Comment:
   Hi @lhotari 
   
   Could you tell me how to reproduce it? or do you have any stack trace?
   because I try to run this test 100 times. always pass...  :( 
   
   <img width="1599" alt="image" src="https://user-images.githubusercontent.com/74767115/186874129-2bbb0d2d-edc9-42d4-ba5d-8d5c650914f4.png">
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1224270123

   This is very interesting.
   How did you find this problem ?
   It doesn't look like a regression of code contributed recently


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r955801428


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

Review Comment:
   I will try to verify it. Maybe I lost something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r953131445


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -332,12 +332,17 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl
     }
 
     @Override
-    protected void readMoreEntries(Consumer consumer) {
+    protected synchronized void readMoreEntries(Consumer consumer) {
         // consumer can be null when all consumers are disconnected from broker.
         // so skip reading more entries if currently there is no active consumer.
         if (null == consumer) {
             return;
         }
+        if (havePendingRead) {

Review Comment:
   I see several parts of the code that call this `readMoreEntries` method after verifying that `havePendingRead` is `true`. I haven't yet verified the details of each call, but it seems like we should clean up those guards if we are moving the logic into this method.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -332,12 +332,17 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl
     }
 
     @Override
-    protected void readMoreEntries(Consumer consumer) {
+    protected synchronized void readMoreEntries(Consumer consumer) {

Review Comment:
   It seems like we might not want to acquire the lock before the `consumer` null check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r955855495


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
+    private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+
+        latch.await();
+        // Wait 2000 milli sec to check if we can get more than 30 messages.
+        Thread.sleep(2000);
+        // If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

Review Comment:
   Hi @lhotari 
   
   Could you tell me how to reproduce it? or do you have any stack trace?
   because I try to run this test 100 times. always pass...  :( 
   
   <img width="1599" alt="image" src="https://user-images.githubusercontent.com/74767115/186874129-2bbb0d2d-edc9-42d4-ba5d-8d5c650914f4.png">
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r952498972


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java:
##########
@@ -175,14 +175,19 @@ public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntry
     }
 
     @Override
-    protected void readMoreEntries(Consumer consumer) {
+    protected synchronized void readMoreEntries(Consumer consumer) {
         // consumer can be null when all consumers are disconnected from broker.
         // so skip reading more entries if currently there is no active consumer.
         if (null == consumer) {
             return;
         }
+        if (havePendingRead) {

Review Comment:
   Check status ahead of time to avoid logging incorrect information.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -354,17 +359,15 @@ protected void readMoreEntries(Consumer consumer) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
             }
 
-            synchronized (this) {
-                havePendingRead = true;
-                if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
-                            this, consumer);
-                } else {
-                    ReadEntriesCtx readEntriesCtx =
-                            ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
-                    cursor.asyncReadEntriesOrWait(messagesToRead,
-                            bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
-                }
+            havePendingRead = true;

Review Comment:
   Move `synchronized` to the method, could you take a look?
   /cc @shibd 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1223952655

   I will add more information later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17237: [fix][broker] Fix dispatch the duplicated message with `Exclusive` mode.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#issuecomment-1223949503

   I will add more information later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org