You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:06:56 UTC
[pulsar] branch master updated: [fix][broker]Fix topic-level replicator rate limiter not init (#15825)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f40cc1d110 [fix][broker]Fix topic-level replicator rate limiter not init (#15825)
9f40cc1d110 is described below
commit 9f40cc1d1104900c450a599676ca446b1f096a00
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Jun 13 10:06:46 2022 +0800
[fix][broker]Fix topic-level replicator rate limiter not init (#15825)
### Motivation
* Fix bug: The replicator rate limiter will not be initialized and updated if only topic-level policy is enabled, because `replicator.getRateLimiter()` is empty for L3067:
https://github.com/apache/pulsar/blob/a43981109a9322d94082ae0d87d0de53b8f237e8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3063-L3068
### Modifications
* Add the method `org.apache.pulsar.broker.service.Replicator#updateRateLimiter` to initialize or update the replicator rate limiter.
* Use this method to initialize or update all level replicator rate limiter
---
.../pulsar/broker/service/BrokerService.java | 2 +-
.../apache/pulsar/broker/service/Replicator.java | 3 +
.../service/persistent/PersistentReplicator.java | 15 ++-
.../broker/service/persistent/PersistentTopic.java | 11 +-
.../broker/service/ReplicatorRateLimiterTest.java | 128 +++++++++++++++++++++
5 files changed, 146 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bf21b0a6fdc..b991c4378e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2417,7 +2417,7 @@ public class BrokerService implements Closeable {
((AbstractTopic) topic).updateBrokerReplicatorDispatchRate();
}
topic.getReplicators().forEach((name, persistentReplicator) ->
- persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+ persistentReplicator.updateRateLimiter());
}
);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 2cd6ec62327..eea90efb883 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -41,6 +41,9 @@ public interface Replicator {
//No-op
}
+ default void updateRateLimiter() {
+ }
+
default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cc5410dbbeb..953300e823f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -71,6 +71,7 @@ public class PersistentReplicator extends AbstractReplicator
protected final ManagedCursor cursor;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+ private final Object dispatchRateLimiterLock = new Object();
private int readBatchSize;
private final int readMaxSizeBytes;
@@ -705,12 +706,20 @@ public class PersistentReplicator extends AbstractReplicator
@Override
public void initializeDispatchRateLimiterIfNeeded() {
- if (!dispatchRateLimiter.isPresent()
- && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
- this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
+ synchronized (dispatchRateLimiterLock) {
+ if (!dispatchRateLimiter.isPresent()
+ && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
+ this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
+ }
}
}
+ @Override
+ public void updateRateLimiter() {
+ initializeDispatchRateLimiterIfNeeded();
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
+ }
+
private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
if (!msg.getMessageBuilder().hasMarkerType()) {
// No marker is defined
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 18fd4873435..4d953aa347d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -383,10 +383,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
&& DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
-
- // dispatch rate limiter for each replicator
- replicators.forEach((name, replicator) ->
- replicator.initializeDispatchRateLimiterIfNeeded());
}
}
@@ -2396,9 +2392,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
- replicators.forEach((name, replicator) ->
- replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
- );
+ replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
@@ -3027,8 +3021,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
- replicators.forEach((name, replicator) -> replicator.getRateLimiter()
- .ifPresent(DispatchRateLimiter::updateDispatchRate));
+ replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
if (policies.getReplicationClusters() != null) {
checkReplicationAndRetryOnFailure();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 3ace687d5d9..ec39fcda764 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertFalse;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@@ -76,6 +77,133 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}
+ @Test
+ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyTopicLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+ assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set topic-level policy, which should take effect
+ DispatchRate topicRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(10)
+ .dispatchThrottlingRateInByte(20)
+ .ratePeriodInSecond(30)
+ .build();
+ admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate));
+ assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+ //remove topic-level policy
+ admin1.topics().removeReplicatorDispatchRate(topicName);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ -1L);
+ }
+
+ @Test
+ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyNamespaceLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+ assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set namespace-level policy, which should take effect
+ DispatchRate topicRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(10)
+ .dispatchThrottlingRateInByte(20)
+ .ratePeriodInSecond(30)
+ .build();
+ admin1.namespaces().setReplicatorDispatchRate(namespace, topicRate);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), topicRate));
+ assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+ //remove topic-level policy
+ admin1.namespaces().removeReplicatorDispatchRate(namespace);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin1.namespaces().getReplicatorDispatchRate(namespace)));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ -1L);
+ }
+
+ @Test
+ public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level when init
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyBrokerLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+ assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set broker-level policy, which should take effect
+ admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg", "10");
+ admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInByte", "20");
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(admin1.brokers()
+ .getAllDynamicConfigurations().containsKey("dispatchThrottlingRatePerReplicatorInByte"));
+ assertEquals(admin1.brokers()
+ .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInMsg"), "10");
+ assertEquals(admin1.brokers()
+ .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"), "20");
+ });
+
+ assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+ }
+
@Test
public void testReplicatorRatePriority() throws Exception {
cleanup();