You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/22 08:44:38 UTC
[pulsar] branch master updated: [fix][broker] Make `deleteTopicPolicies` serialized is executed when close topic. (#15811)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e8ee996dd0c [fix][broker] Make `deleteTopicPolicies` serialized is executed when close topic. (#15811)
e8ee996dd0c is described below
commit e8ee996dd0c7a3a742117aee399b31e89e6e2d9d
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Mon Aug 22 16:44:31 2022 +0800
[fix][broker] Make `deleteTopicPolicies` serialized is executed when close topic. (#15811)
---
.../org/apache/pulsar/broker/service/BrokerService.java | 15 ++++++++++++---
.../pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
.../client/api/AuthenticatedProducerConsumerTest.java | 1 +
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e99fce6f68a..5d3a034b21c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3013,11 +3013,20 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
- if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ final PulsarService pulsarService = pulsar();
+ if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
return CompletableFuture.completedFuture(null);
}
- TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
- return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+ return pulsarService.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenComposeAsync(optPolicies -> {
+ if (optPolicies.isPresent() && optPolicies.get().deleted) {
+ // We can return the completed future directly if the namespace is already deleted.
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+ });
}
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 87a122d3c7a..373aeb0d8e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1170,7 +1170,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema())
- .thenAccept(__ -> deleteTopicPolicies())
+ .thenCompose(__ -> deleteTopicPolicies())
.thenCompose(__ -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
if (ex != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 4cb8e71c1ab..5a4f5dd4877 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -92,6 +92,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
+ conf.setTopicLevelPoliciesEnabled(false);
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("localhost");