You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/22 01:58:24 UTC
[pulsar] branch master updated: [Issue 12726][broker] Fix deadlock in metadata-store callback thread (#12753)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ebfcd71 [Issue 12726][broker] Fix deadlock in metadata-store callback thread (#12753)
ebfcd71 is described below
commit ebfcd713bc07b213cbba992192b9a1f5cc35d971
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Nov 22 09:57:12 2021 +0800
[Issue 12726][broker] Fix deadlock in metadata-store callback thread (#12753)
Fixes #12726
### Motivation
See #12726
### Modifications
Use PolicyHierarchyValue to cache backlogQuota policies to avoid calling blocking metadata query in metadata-callback thread.
---
.../pulsar/broker/service/AbstractTopic.java | 42 ++++++++--
.../pulsar/broker/service/BacklogQuotaManager.java | 39 +--------
.../service/nonpersistent/NonPersistentTopic.java | 7 +-
.../broker/service/persistent/PersistentTopic.java | 31 ++++---
.../pulsar/broker/admin/TopicPoliciesTest.java | 96 ++++++++++++++--------
.../service/persistent/MessageDuplicationTest.java | 38 +++++----
.../policies/data/HierarchyTopicPolicies.java | 8 ++
7 files changed, 149 insertions(+), 112 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d505971..26c591d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -37,6 +38,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
@@ -51,6 +53,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -140,13 +143,8 @@ public abstract class AbstractTopic implements Topic {
ServiceConfiguration config = brokerService.pulsar().getConfiguration();
this.replicatorPrefix = config.getReplicatorPrefix();
-
topicPolicies = new HierarchyTopicPolicies();
- topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
- config.getBrokerDeleteInactiveTopicsMode(),
- config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
- config.isBrokerDeleteInactiveTopicsEnabled()));
- topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
+ updateTopicPolicyByBrokerConfig(topicPolicies, brokerService);
this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(
config.getMaxMessageSizeCheckIntervalInSeconds());
@@ -156,6 +154,38 @@ public abstract class AbstractTopic implements Topic {
updatePublishDispatcher(Optional.empty());
}
+ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
+ }
+ if (namespacePolicies.deleted) {
+ return;
+ }
+ topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
+ topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
+ Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
+ type -> this.topicPolicies.getBackLogQuotaMap().get(type)
+ .updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
+ }
+
+ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicies, BrokerService brokerService) {
+ ServiceConfiguration config = brokerService.pulsar().getConfiguration();
+ topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
+ config.getBrokerDeleteInactiveTopicsMode(),
+ config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
+ config.isBrokerDeleteInactiveTopicsEnabled()));
+
+ topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
+
+ //init backlogQuota
+ topicPolicies.getBackLogQuotaMap()
+ .get(BacklogQuota.BacklogQuotaType.destination_storage)
+ .updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());
+ topicPolicies.getBackLogQuotaMap()
+ .get(BacklogQuota.BacklogQuotaType.message_age)
+ .updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());
+ }
+
protected boolean isProducersExceeded() {
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 042f9ff..2df198c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -33,23 +32,17 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
public class BacklogQuotaManager {
private final BacklogQuotaImpl defaultQuota;
- private final PulsarService pulsar;
- private final boolean isTopicLevelPoliciesEnable;
private final NamespaceResources namespaceResources;
-
public BacklogQuotaManager(PulsarService pulsar) {
- this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
double backlogQuotaGB = pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB();
this.defaultQuota = BacklogQuotaImpl.builder()
.limitSize(backlogQuotaGB > 0 ? (long) (backlogQuotaGB * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
@@ -58,7 +51,6 @@ public class BacklogQuotaManager {
.retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
.build();
this.namespaceResources = pulsar.getPulsarResources().getNamespaceResources();
- this.pulsar = pulsar;
}
public BacklogQuotaImpl getDefaultQuota() {
@@ -77,34 +69,6 @@ public class BacklogQuotaManager {
}
}
- public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuotaType backlogQuotaType) {
- if (!isTopicLevelPoliciesEnable) {
- return getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType);
- }
-
- try {
- return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
- .map(TopicPolicies::getBackLogQuotaMap)
- .map(map -> map.get(backlogQuotaType.name()))
- .orElseGet(() -> getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType));
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.debug("Topic policies cache have not init, will apply the namespace backlog quota: topicName={}",
- topicName);
- } catch (Exception e) {
- log.error("Failed to read topic policies data, "
- + "will apply the namespace backlog quota: topicName={}", topicName, e);
- }
- return getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType);
- }
-
- public long getBacklogQuotaLimitInSize(TopicName topicName) {
- return getBacklogQuota(topicName, BacklogQuotaType.destination_storage).getLimitSize();
- }
-
- public int getBacklogQuotaLimitInTime(TopicName topicName) {
- return getBacklogQuota(topicName, BacklogQuotaType.message_age).getLimitTime();
- }
-
/**
* Handle exceeded size backlog by using policies set in the zookeeper for given topic.
*
@@ -112,8 +76,7 @@ public class BacklogQuotaManager {
*/
public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
boolean preciseTimeBasedBacklogQuotaCheck) {
- TopicName topicName = TopicName.get(persistentTopic.getName());
- BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType);
+ BacklogQuota quota = persistentTopic.getBacklogQuota(backlogQuotaType);
log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType,
persistentTopic.getName(), quota.getPolicy());
switch (quota.getPolicy()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3c5831e..b2f9e61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -149,9 +149,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
isEncryptionRequired = false;
} else {
Policies policies = optPolicies.get();
+ updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
- topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(policies.inactive_topic_policies);
setSchemaCompatibilityStrategy(policies);
schemaValidationEnforced = policies.schema_validation_enforced;
}
@@ -964,6 +964,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
data.encryption_required);
}
+
+ updateTopicPolicyByNamespacePolicy(data);
+
isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
@@ -975,8 +978,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
- this.topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(data.inactive_topic_policies);
-
return checkReplicationAndRetryOnFailure();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 78f0fda..38661b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -31,6 +31,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -126,6 +127,7 @@ import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
@@ -317,6 +319,9 @@ public class PersistentTopic extends AbstractTopic
}
Policies policies = optPolicies.get();
+
+ this.updateTopicPolicyByNamespacePolicy(policies);
+
this.isEncryptionRequired = policies.encryption_required;
setSchemaCompatibilityStrategy(policies);
@@ -324,8 +329,6 @@ public class PersistentTopic extends AbstractTopic
schemaValidationEnforced = policies.schema_validation_enforced;
- topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(policies.inactive_topic_policies);
-
updateUnackedMessagesAppliedOnSubscription(policies);
updateUnackedMessagesExceededOnConsumer(policies);
}).exceptionally(ex -> {
@@ -2399,6 +2402,9 @@ public class PersistentTopic extends AbstractTopic
log.debug("Ignore the update because it has been deleted : {}", data);
return CompletableFuture.completedFuture(null);
}
+
+ updateTopicPolicyByNamespacePolicy(data);
+
isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
@@ -2407,7 +2413,6 @@ public class PersistentTopic extends AbstractTopic
schemaValidationEnforced = data.schema_validation_enforced;
updateUnackedMessagesAppliedOnSubscription(data);
updateUnackedMessagesExceededOnConsumer(data);
- this.topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(data.max_subscriptions_per_topic);
if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
@@ -2416,8 +2421,6 @@ public class PersistentTopic extends AbstractTopic
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
Optional<TopicPolicies> topicPolicies = getTopicPolicies();
- this.topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(data.inactive_topic_policies);
-
initializeRateLimiterIfNeeded(Optional.ofNullable(data));
this.updateMaxPublishRate(data);
@@ -2453,7 +2456,6 @@ public class PersistentTopic extends AbstractTopic
subscribeRateLimiter.get().onPoliciesUpdate(data);
}
-
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
preCreateSubscriptionForCompactionIfNeeded());
}
@@ -2463,9 +2465,8 @@ public class PersistentTopic extends AbstractTopic
* @return Backlog quota for topic
*/
@Override
- public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
- TopicName topicName = TopicName.get(this.getName());
- return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType);
+ public BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType) {
+ return this.topicPolicies.getBackLogQuotaMap().get(backlogQuotaType).get();
}
/**
@@ -2497,8 +2498,7 @@ public class PersistentTopic extends AbstractTopic
* @return determine if backlog quota enforcement needs to be done for topic based on size limit
*/
public boolean isSizeBacklogExceeded() {
- TopicName topicName = TopicName.get(getName());
- long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInSize(topicName);
+ long backlogQuotaLimitInBytes = getBacklogQuota(BacklogQuotaType.destination_storage).getLimitSize();
if (backlogQuotaLimitInBytes < 0) {
return false;
}
@@ -2521,7 +2521,7 @@ public class PersistentTopic extends AbstractTopic
public boolean isTimeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
CompletableFuture<Boolean> future = new CompletableFuture<>();
- int backlogQuotaLimitInSecond = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInTime(topicName);
+ int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
// If backlog quota by time is not set and we have no durable cursor.
if (backlogQuotaLimitInSecond <= 0
@@ -3088,8 +3088,13 @@ public class PersistentTopic extends AbstractTopic
}
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(policies.getMaxSubscriptionsPerTopic());
-
topicPolicies.getInactiveTopicPolicies().updateTopicValue(policies.getInactiveTopicPolicies());
+ Arrays.stream(BacklogQuotaType.values()).forEach(type ->
+ this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
+ policies.getBackLogQuotaMap() == null ? null :
+ policies.getBackLogQuotaMap().get(type.toString()))
+ );
+
updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index d4a7963..e0e6b5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -44,7 +44,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -100,6 +99,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
private final String topicPolicyEventsTopic = "persistent://" + myNamespace + "/__change_events";
+ private final int testTopicPartitions = 2;
+
@BeforeMethod
@Override
protected void setup() throws Exception {
@@ -113,7 +114,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant(this.testTenant, tenantInfo);
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
- admin.topics().createPartitionedTopic(testTopic, 2);
+ admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
Producer producer = pulsarClient.newProducer().topic(testTopic).create();
producer.close();
waitForZooKeeperWatchers();
@@ -141,13 +142,13 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
-
- BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
- BacklogQuota backlogQuotaInManager = backlogQuotaManager
- .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage);
- log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
- Assert.assertEquals(backlogQuota, backlogQuotaInManager);
-
+ for (int i = 0; i < testTopicPartitions; i++) {
+ String partition = TopicName.get(testTopic).getPartition(i).toString();
+ Topic topic = pulsar.getBrokerService().getTopic(partition, false).get().get();
+ BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+ log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInTopic, testTopic);
+ Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
+ }
admin.topics().deletePartitionedTopic(testTopic, true);
}
@@ -164,18 +165,28 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota));
-
- BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
- BacklogQuota backlogQuotaInManager = backlogQuotaManager
- .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age);
-
- Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+ for (int i = 0; i < testTopicPartitions; i++) {
+ String partition = TopicName.get(testTopic).getPartition(i).toString();
+ Topic topic = pulsar.getBrokerService().getTopic(partition, false).get().get();
+ BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
+ Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
+ }
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveSizeBasedBacklogQuota() throws Exception {
+ List<Topic> partitions = new ArrayList<>();
+ List<BacklogQuota> defaultBacklogQuotas = new ArrayList<>();
+ for (int i = 0; i < testTopicPartitions; i++) {
+ String partition = TopicName.get(testTopic).getPartition(i).toString();
+ Topic topic = pulsar.getBrokerService().getOrCreateTopic(partition).get();
+ partitions.add(topic);
+ BacklogQuota defaultBacklogQuota = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+ defaultBacklogQuotas.add(defaultBacklogQuota);
+ }
+
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
@@ -189,52 +200,69 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
.untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
- BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
- BacklogQuota backlogQuotaInManager = backlogQuotaManager
- .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage);
- log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
- Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+ partitions.forEach(topic -> {
+ BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+ log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInTopic, testTopic);
+ Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
+ });
admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage)));
-
- backlogQuotaInManager = backlogQuotaManager
- .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage);
- log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
- testTopic);
- Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
+ for (int i = 0; i < partitions.size(); i++) {
+ BacklogQuota backlogQuotaInTopic =
+ partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+ log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInTopic,
+ testTopic);
+ Assert.assertEquals(defaultBacklogQuotas.get(i), backlogQuotaInTopic);
+ }
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveTimeBasedBacklogQuota() throws Exception {
+ List<Topic> partitions = new ArrayList<>();
+ List<BacklogQuota> defaultBacklogQuotas = new ArrayList<>();
+ for (int i = 0; i < testTopicPartitions; i++) {
+ String partition = TopicName.get(testTopic).getPartition(i).toString();
+ Topic topic = pulsar.getBrokerService().getOrCreateTopic(partition).get();
+ partitions.add(topic);
+ BacklogQuota defaultBacklogQuota = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
+ defaultBacklogQuotas.add(defaultBacklogQuota);
+ }
+
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitTime(1000)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
-
admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota));
- BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
- BacklogQuota backlogQuotaInManager = backlogQuotaManager
- .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age);
- Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+ for (int i = 0; i < partitions.size(); i++) {
+ Assert.assertEquals(partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age),
+ backlogQuota);
+ //destination_storage should keep the same.
+ Assert.assertEquals(partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage),
+ defaultBacklogQuotas.get(i));
+ }
admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age)));
- backlogQuotaInManager = backlogQuotaManager
- .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age);
- Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
+ for (int i = 0; i < partitions.size(); i++) {
+ BacklogQuota backlogQuotaInTopic =
+ partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
+ log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInTopic,
+ testTopic);
+ Assert.assertEquals(defaultBacklogQuotas.get(i), backlogQuotaInTopic);
+ }
admin.topics().deletePartitionedTopic(testTopic, true);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 866ed13..c23c20f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -18,24 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.EventLoopGroup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.testng.annotations.Test;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -50,6 +32,24 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+import java.lang.reflect.Field;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
@@ -218,6 +218,7 @@ public class MessageDuplicationTest {
serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+ doReturn(mock(PulsarResources.class)).when(pulsarService).getPulsarResources();
ManagedLedger managedLedger = mock(ManagedLedger.class);
MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
@@ -236,6 +237,7 @@ public class MessageDuplicationTest {
BrokerService brokerService = mock(BrokerService.class);
doReturn(eventLoopGroup).when(brokerService).executor();
doReturn(pulsarService).when(brokerService).pulsar();
+ doReturn(new BacklogQuotaManager(pulsarService)).when(brokerService).getBacklogQuotaManager();
PersistentTopic persistentTopic = spy(new PersistentTopic("topic-1", brokerService, managedLedger, messageDeduplication));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 197b973..47df060 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.common.policies.data;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
import lombok.Getter;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
/**
* Topic policy hierarchy value container
@@ -28,9 +31,14 @@ import lombok.Getter;
public class HierarchyTopicPolicies {
final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
final PolicyHierarchyValue<Integer> maxSubscriptionsPerTopic;
+ final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
public HierarchyTopicPolicies() {
inactiveTopicPolicies = new PolicyHierarchyValue<>();
maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
+ backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
+ .put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
+ .put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
+ .build();
}
}