You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/05/12 11:29:55 UTC

[pulsar] branch master updated: [fix][broker] Allow Access to System Topic Metadata for Reader Creation Post-Namespace Deletion (#20304)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d5ec947249 [fix][broker] Allow Access to System Topic Metadata for Reader Creation Post-Namespace Deletion (#20304)
5d5ec947249 is described below

commit 5d5ec947249df50bf35a78b6a2a0d3b00d97ca66
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 12 19:29:42 2023 +0800

    [fix][broker] Allow Access to System Topic Metadata for Reader Creation Post-Namespace Deletion (#20304)
    
    ## Motivation
    After initiating the snapshot segment function, deletion of topics necessitates the activation of readers. Furthermore, these readers should be opened and deleted as they are used, which implies that we should not pre-store readers. However, after initiating the deletion of namespaces currently, it is not allowed to obtain the metadata of partition topics or lookup, making it impossible to create readers. This results in the inability to delete namespaces.
    
    ## Modification
    Allow the acquisition of system topic metadata after initiating namespace deletion, thus creating readers to clean up topic data.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 14 ++++++-
 .../pulsar/broker/lookup/TopicLookupBase.java      | 44 ++++++++++++----------
 .../pulsar/broker/transaction/TransactionTest.java | 21 +++++++++++
 3 files changed, 58 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9dcbe13d615..ed7cd70c641 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
 import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
 import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
 import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
@@ -4408,8 +4409,13 @@ public class PersistentTopicsBase extends AdminResource {
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
+        // It is necessary for system topic operations because system topics are used to store metadata
+        // and other vital information. Even after namespace starting deletion,,
+        // we need to access the metadata of system topics to create readers and clean up topic data.
+        // If we don't do this, it can prevent namespace deletion due to inaccessible readers.
         authorizationFuture.thenCompose(__ ->
-                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(),
+                                SystemTopicNames.isSystemTopic(topicName)))
                 .thenCompose(res ->
                         pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
                 .thenAccept(metadata -> {
@@ -4436,7 +4442,11 @@ public class PersistentTopicsBase extends AdminResource {
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
-        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
+        // It is necessary for system topic operations because system topics are used to store metadata
+        // and other vital information. Even after namespace starting deletion,,
+        // we need to access the metadata of system topics to create readers and clean up topic data.
+        // If we don't do this, it can prevent namespace deletion due to inaccessible readers.
+        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName))
             .thenCompose(res -> pulsar.getBrokerService()
                 .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
             .thenAccept(metadata -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 3b64d2a9f83..bd70201cba5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -221,26 +222,31 @@ public class TopicLookupBase extends PulsarWebResource {
                 // (2) authorize client
                 checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
                         // (3) validate global namespace
+                        // It is necessary for system topic operations because system topics are used to store metadata
+                        // and other vital information. Even after namespace starting deletion,
+                        // we need to access the metadata of system topics to create readers and clean up topic data.
+                        // If we don't do this, it can prevent namespace deletion due to inaccessible readers.
                         checkLocalOrGetPeerReplicationCluster(pulsarService,
-                                topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
-                            if (peerClusterData == null) {
-                                // (4) all validation passed: initiate lookup
-                                validationFuture.complete(null);
-                                return;
-                            }
-                            // if peer-cluster-data is present it means namespace is owned by that peer-cluster and
-                            // request should be redirect to the peer-cluster
-                            if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
-                                    && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
-                                validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService url is not configured",
-                                        requestId));
-                                return;
-                            }
-                            validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
-                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
-                                    requestId,
-                                    false));
+                                topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName))
+                                .thenAccept(peerClusterData -> {
+                                    if (peerClusterData == null) {
+                                        // (4) all validation passed: initiate lookup
+                                        validationFuture.complete(null);
+                                        return;
+                                    }
+                                    // if peer-cluster-data is present it means namespace is owned by that peer-cluster
+                                    // and request should be redirect to the peer-cluster
+                                    if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
+                                            && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
+                                        validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
+                                                "Redirected cluster's brokerService url is not configured",
+                                                requestId));
+                                        return;
+                                    }
+                                    validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
+                                            peerClusterData.getBrokerServiceUrlTls(), true,
+                                            LookupType.Redirect, requestId,
+                                            false));
                         }).exceptionally(ex -> {
                             Throwable throwable = FutureUtil.unwrapCompletionException(ex);
                             if (throwable instanceof RestException restException){
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c3533e70cf8..c4ec2ec766e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -272,6 +272,27 @@ public class TransactionTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testCanDeleteNamespaceWhenEnableTxnSegmentedSnapshot() throws Exception {
+        // Enable the segmented snapshot feature
+        pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        pulsarServiceList.get(0).getConfig().setForceDeleteNamespaceAllowed(true);
+
+        // Create a new namespace
+        String namespaceName =  TENANT + "/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic";
+        admin.namespaces().createNamespace(namespaceName);
+
+        // Create a new topic in the namespace
+        String topicName = "persistent://" + namespaceName + "/newTopic";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        producer.close();
+
+        // Destroy the namespace after the test
+        admin.namespaces().deleteNamespace(namespaceName, true);
+        pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+    }
+
     @Test
     public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
         String subName = "test";