You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:06:56 UTC

[pulsar] branch master updated: [fix][broker]Fix topic-level replicator rate limiter not init (#15825)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f40cc1d110 [fix][broker]Fix topic-level replicator rate limiter not init (#15825)
9f40cc1d110 is described below

commit 9f40cc1d1104900c450a599676ca446b1f096a00
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Jun 13 10:06:46 2022 +0800

    [fix][broker]Fix topic-level replicator rate limiter not init (#15825)
    
    ### Motivation
    
    
    * Fix bug: The replicator rate limiter will not be initialized and updated if only topic-level policy is enabled, because  `replicator.getRateLimiter()` is empty for L3067:
    https://github.com/apache/pulsar/blob/a43981109a9322d94082ae0d87d0de53b8f237e8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3063-L3068
    
    ### Modifications
    
    *  Add the method `org.apache.pulsar.broker.service.Replicator#updateRateLimiter` to initialize or update  the replicator rate limiter.
    *  Use this method to initialize or update all level replicator rate limiter
---
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../apache/pulsar/broker/service/Replicator.java   |   3 +
 .../service/persistent/PersistentReplicator.java   |  15 ++-
 .../broker/service/persistent/PersistentTopic.java |  11 +-
 .../broker/service/ReplicatorRateLimiterTest.java  | 128 +++++++++++++++++++++
 5 files changed, 146 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bf21b0a6fdc..b991c4378e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2417,7 +2417,7 @@ public class BrokerService implements Closeable {
                         ((AbstractTopic) topic).updateBrokerReplicatorDispatchRate();
                     }
                     topic.getReplicators().forEach((name, persistentReplicator) ->
-                        persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+                        persistentReplicator.updateRateLimiter());
                 }
             );
         });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 2cd6ec62327..eea90efb883 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -41,6 +41,9 @@ public interface Replicator {
         //No-op
     }
 
+    default void updateRateLimiter() {
+    }
+
     default Optional<DispatchRateLimiter> getRateLimiter() {
         return Optional.empty();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cc5410dbbeb..953300e823f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -71,6 +71,7 @@ public class PersistentReplicator extends AbstractReplicator
     protected final ManagedCursor cursor;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+    private final Object dispatchRateLimiterLock = new Object();
 
     private int readBatchSize;
     private final int readMaxSizeBytes;
@@ -705,12 +706,20 @@ public class PersistentReplicator extends AbstractReplicator
 
     @Override
     public void initializeDispatchRateLimiterIfNeeded() {
-        if (!dispatchRateLimiter.isPresent()
-            && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
-            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
+        synchronized (dispatchRateLimiterLock) {
+            if (!dispatchRateLimiter.isPresent()
+                && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
+                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
+            }
         }
     }
 
+    @Override
+    public void updateRateLimiter() {
+        initializeDispatchRateLimiterIfNeeded();
+        dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
+    }
+
     private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
         if (!msg.getMessageBuilder().hasMarkerType()) {
             // No marker is defined
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 18fd4873435..4d953aa347d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -383,10 +383,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 && DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
                 this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
             }
-
-            // dispatch rate limiter for each replicator
-            replicators.forEach((name, replicator) ->
-                replicator.initializeDispatchRateLimiterIfNeeded());
         }
     }
 
@@ -2396,9 +2392,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
             return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
-                replicators.forEach((name, replicator) ->
-                        replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-                );
+                replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
                 checkMessageExpiry();
                 CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
                 CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
@@ -3027,8 +3021,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
             updatePublishDispatcher();
             updateSubscribeRateLimiter();
-            replicators.forEach((name, replicator) -> replicator.getRateLimiter()
-                    .ifPresent(DispatchRateLimiter::updateDispatchRate));
+            replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
 
             if (policies.getReplicationClusters() != null) {
                 checkReplicationAndRetryOnFailure();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 3ace687d5d9..ec39fcda764 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertFalse;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.util.concurrent.TimeUnit;
@@ -76,6 +77,133 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
         return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
     }
 
+    @Test
+    public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception {
+        cleanup();
+        config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level
+        config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+        setup();
+
+        final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyTopicLevel";
+
+        admin1.namespaces().createNamespace(namespace);
+        // set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+        client1.newProducer().topic(topicName).create().close();
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        // rate limiter disable by default
+        assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+        //set topic-level policy, which should take effect
+        DispatchRate topicRate = DispatchRate.builder()
+            .dispatchThrottlingRateInMsg(10)
+            .dispatchThrottlingRateInByte(20)
+            .ratePeriodInSecond(30)
+            .build();
+        admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate));
+        assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+        //remove topic-level policy
+        admin1.topics().removeReplicatorDispatchRate(topicName);
+        Awaitility.await().untilAsserted(() ->
+            assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+            -1L);
+    }
+
+    @Test
+    public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception {
+        cleanup();
+        config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level
+        config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+        setup();
+
+        final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyNamespaceLevel";
+
+        admin1.namespaces().createNamespace(namespace);
+        // set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+        client1.newProducer().topic(topicName).create().close();
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        // rate limiter disable by default
+        assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+        //set namespace-level policy, which should take effect
+        DispatchRate topicRate = DispatchRate.builder()
+            .dispatchThrottlingRateInMsg(10)
+            .dispatchThrottlingRateInByte(20)
+            .ratePeriodInSecond(30)
+            .build();
+        admin1.namespaces().setReplicatorDispatchRate(namespace, topicRate);
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), topicRate));
+        assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+        //remove topic-level policy
+        admin1.namespaces().removeReplicatorDispatchRate(namespace);
+        Awaitility.await().untilAsserted(() ->
+            assertNull(admin1.namespaces().getReplicatorDispatchRate(namespace)));
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+            -1L);
+    }
+
+    @Test
+    public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception {
+        cleanup();
+        config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level when init
+        config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+        setup();
+
+        final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyBrokerLevel";
+
+        admin1.namespaces().createNamespace(namespace);
+        // set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+        client1.newProducer().topic(topicName).create().close();
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        // rate limiter disable by default
+        assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+        //set broker-level policy, which should take effect
+        admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg", "10");
+        admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInByte", "20");
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(admin1.brokers()
+                .getAllDynamicConfigurations().containsKey("dispatchThrottlingRatePerReplicatorInByte"));
+            assertEquals(admin1.brokers()
+                .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInMsg"), "10");
+            assertEquals(admin1.brokers()
+                .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"), "20");
+        });
+
+        assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+    }
+
     @Test
     public void testReplicatorRatePriority() throws Exception {
         cleanup();