You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/06 03:26:12 UTC
[pulsar] branch branch-2.7 updated: [Branch-2.7][Broker] Fix using
partitioned topic name to get topic policies (#11897)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 3876c56 [Branch-2.7][Broker] Fix using partitioned topic name to get topic policies (#11897)
3876c56 is described below
commit 3876c56e6a6a0614dd5d5fb11d91fdc9c8f4a259
Author: ran <ga...@126.com>
AuthorDate: Mon Sep 6 11:25:29 2021 +0800
[Branch-2.7][Broker] Fix using partitioned topic name to get topic policies (#11897)
### Motivation
There is a bug that using the partitioned topic name to get topic policies. The PR https://github.com/apache/pulsar/pull/11294 fix this issue, but it's hard to cherry-pick the PR to `branch-2.7`, so create this PR to fix the issue in `branch-2.7`.
This PR contains [PR-11294](https://github.com/apache/pulsar/pull/11294) and [PR-11863](https://github.com/apache/pulsar/pull/11863).
### Modifications
1. Fix using the partitioned topic name to get topic policies.
2. Change some warning logs to debug level for the getting topic policies operation.
### Verifying this change
The test method `TopicPoliciesTest#testBacklogQuotaWithPartitionedTopic` is used to verify getting topic policies by the partitioned topic name.
---
.../pulsar/broker/service/AbstractTopic.java | 8 ++-----
.../pulsar/broker/service/BacklogQuotaManager.java | 6 +++++-
.../pulsar/broker/service/BrokerService.java | 16 ++++----------
.../SystemTopicBasedTopicPoliciesService.java | 4 ++--
.../pulsar/broker/admin/TopicPoliciesTest.java | 25 ++++++++++++++++++++++
.../pulsar/broker/service/BrokerServiceTest.java | 2 --
6 files changed, 38 insertions(+), 23 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0d40ecb..23b2221 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -559,14 +559,10 @@ public abstract class AbstractTopic implements Topic {
* @return TopicPolicies is exist else return null.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
- TopicName cloneTopicName = topicName;
- if (topicName.isPartitioned()) {
- cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
- }
try {
- return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+ log.debug("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
return null;
} catch (NullPointerException e) {
log.warn("Topic level policies are not enabled. " +
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 4b965fd..020efa1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -86,8 +86,12 @@ public class BacklogQuotaManager {
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> map.get(BacklogQuotaType.destination_storage.name()))
.orElseGet(() -> getBacklogQuota(topicName.getNamespace(),policyPath));
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.debug("Topic policies cache have not init, will apply the namespace backlog quota: topicName={}",
+ topicName);
} catch (Exception e) {
- log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}", topicName, e);
+ log.error("Failed to read topic policies data, "
+ + "will apply the namespace backlog quota: topicName={}", topicName, e);
}
return getBacklogQuota(topicName.getNamespace(),policyPath);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4a18585..b7eca90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1182,19 +1182,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
OffloadPolicies topicLevelOffloadPolicies = null;
if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
- TopicName cloneTopicName = topicName;
- if (topicName.isPartitioned()) {
- cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
- }
try {
- TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
persistencePolicies = topicPolicies.getPersistence();
retentionPolicies = topicPolicies.getRetentionPolicies();
topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.warn("Topic {} policies cache have not init.", topicName);
+ log.debug("Topic {} policies have not been initialized yet.", topicName);
}
}
@@ -2473,15 +2469,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
* @return TopicPolicies is exist else return null.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
- TopicName cloneTopicName = topicName;
- if (topicName.isPartitioned()) {
- cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
- }
try {
checkTopicLevelPolicyEnable();
- return pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ return pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+ log.debug("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
return null;
} catch (RestException | NullPointerException e) {
log.warn("Topic level policies are not enabled. " +
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 18388f1..3dc5a10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -92,7 +92,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
.domain(topicName.getDomain().toString())
.tenant(topicName.getTenant())
.namespace(topicName.getNamespaceObject().getLocalName())
- .topic(topicName.getLocalName())
+ .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
.policies(policies)
.build())
.build()).whenComplete(((messageId, e) -> {
@@ -148,7 +148,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
}
- return policiesCache.get(topicName);
+ return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index a1f226f..efd1a33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -233,6 +234,30 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void testBacklogQuotaWithPartitionedTopic() throws Exception {
+ final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 3);
+ pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
+ Awaitility.await()
+ .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ BacklogQuota backlogQuota = new BacklogQuota(1234, BacklogQuota.RetentionPolicy.producer_exception);
+ admin.topics().setBacklogQuota(topic, backlogQuota);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(topic)
+ .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
+
+ TopicPolicies topicPolicies0 = pulsar.getTopicPoliciesService().getTopicPolicies(
+ TopicName.get(topic).getPartition(0));
+ TopicPolicies topicPolicies1 = pulsar.getTopicPoliciesService().getTopicPolicies(
+ TopicName.get(topic).getPartition(1));
+ Assert.assertEquals(topicPolicies0, topicPolicies1);
+ Assert.assertEquals(topicPolicies0.getBackLogQuotaMap()
+ .get(BacklogQuota.BacklogQuotaType.destination_storage.name()).getLimit(), backlogQuota.getLimit());
+ Assert.assertEquals(topicPolicies0.getBackLogQuotaMap()
+ .get(BacklogQuota.BacklogQuotaType.destination_storage.name()).getPolicy(), backlogQuota.getPolicy());
+ }
+
+ @Test
public void testCheckRetention() throws Exception {
BacklogQuota backlogQuota =
new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 214fb1e..9801311 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -593,8 +593,6 @@ public class BrokerServiceTest extends BrokerTestBase {
} catch (Exception e) {
fail("should not fail");
- } finally {
- pulsarClient.close();
}
}