You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/14 09:24:13 UTC
[pulsar] branch branch-2.10 updated: [fix][broker]Fix topic-level replicator rate limiter not init (#15825)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a29feca574d [fix][broker]Fix topic-level replicator rate limiter not init (#15825)
a29feca574d is described below
commit a29feca574d581e413ca53c703a86b4e4b60e7fb
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Jun 13 10:06:46 2022 +0800
[fix][broker]Fix topic-level replicator rate limiter not init (#15825)
* Fix bug: The replicator rate limiter will not be initialized and updated if only topic-level policy is enabled, because `replicator.getRateLimiter()` is empty for L3067:
https://github.com/apache/pulsar/blob/a43981109a9322d94082ae0d87d0de53b8f237e8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3063-L3068
* Add the method `org.apache.pulsar.broker.service.Replicator#updateRateLimiter` to initialize or update the replicator rate limiter.
* Use this method to initialize or update all level replicator rate limiter
(cherry picked from commit 9f40cc1d1104900c450a599676ca446b1f096a00)
---
.../pulsar/broker/service/BrokerService.java | 2 +-
.../apache/pulsar/broker/service/Replicator.java | 3 +
.../service/persistent/PersistentReplicator.java | 15 ++-
.../broker/service/persistent/PersistentTopic.java | 11 +-
.../broker/service/ReplicatorRateLimiterTest.java | 128 +++++++++++++++++++++
5 files changed, 146 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index cda3d90fd98..3aa0c73ae8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2318,7 +2318,7 @@ public class BrokerService implements Closeable {
((AbstractTopic) topic).updateBrokerReplicatorDispatchRate();
}
topic.getReplicators().forEach((name, persistentReplicator) ->
- persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+ persistentReplicator.updateRateLimiter());
}
);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 2cd6ec62327..eea90efb883 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -41,6 +41,9 @@ public interface Replicator {
//No-op
}
+ default void updateRateLimiter() {
+ }
+
default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cc5410dbbeb..953300e823f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -71,6 +71,7 @@ public class PersistentReplicator extends AbstractReplicator
protected final ManagedCursor cursor;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+ private final Object dispatchRateLimiterLock = new Object();
private int readBatchSize;
private final int readMaxSizeBytes;
@@ -705,12 +706,20 @@ public class PersistentReplicator extends AbstractReplicator
@Override
public void initializeDispatchRateLimiterIfNeeded() {
- if (!dispatchRateLimiter.isPresent()
- && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
- this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
+ synchronized (dispatchRateLimiterLock) {
+ if (!dispatchRateLimiter.isPresent()
+ && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
+ this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
+ }
}
}
+ @Override
+ public void updateRateLimiter() {
+ initializeDispatchRateLimiterIfNeeded();
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
+ }
+
private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
if (!msg.getMessageBuilder().hasMarkerType()) {
// No marker is defined
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 27bb77fbc5d..a82bdb8d286 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -392,10 +392,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
dispatcher.initializeDispatchRateLimiterIfNeeded();
}
});
-
- // dispatch rate limiter for each replicator
- replicators.forEach((name, replicator) ->
- replicator.initializeDispatchRateLimiterIfNeeded());
}
}
@@ -2416,9 +2412,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
});
return FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
- replicators.forEach((name, replicator) ->
- replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
- );
+ replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
@@ -3075,8 +3069,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
- replicators.forEach((name, replicator) -> replicator.getRateLimiter()
- .ifPresent(DispatchRateLimiter::updateDispatchRate));
+ replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
if (policies.getReplicationClusters() != null) {
checkReplicationAndRetryOnFailure();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 65e4bb0a785..fdf27adc718 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertFalse;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@@ -76,6 +77,133 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}
+ @Test
+ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyTopicLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+ assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set topic-level policy, which should take effect
+ DispatchRate topicRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(10)
+ .dispatchThrottlingRateInByte(20)
+ .ratePeriodInSecond(30)
+ .build();
+ admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate));
+ assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+ //remove topic-level policy
+ admin1.topics().removeReplicatorDispatchRate(topicName);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ -1L);
+ }
+
+ @Test
+ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyNamespaceLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+ assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set namespace-level policy, which should take effect
+ DispatchRate topicRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(10)
+ .dispatchThrottlingRateInByte(20)
+ .ratePeriodInSecond(30)
+ .build();
+ admin1.namespaces().setReplicatorDispatchRate(namespace, topicRate);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), topicRate));
+ assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+ //remove topic-level policy
+ admin1.namespaces().removeReplicatorDispatchRate(namespace);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin1.namespaces().getReplicatorDispatchRate(namespace)));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ -1L);
+ }
+
+ @Test
+ public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable broker level when init
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace + "/testReplicatorRateLimiterWithOnlyBrokerLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+ assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set broker-level policy, which should take effect
+ admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg", "10");
+ admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInByte", "20");
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(admin1.brokers()
+ .getAllDynamicConfigurations().containsKey("dispatchThrottlingRatePerReplicatorInByte"));
+ assertEquals(admin1.brokers()
+ .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInMsg"), "10");
+ assertEquals(admin1.brokers()
+ .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"), "20");
+ });
+
+ assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+ }
+
@Test
public void testReplicatorRatePriority() throws Exception {
cleanup();