You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2023/11/19 11:56:34 UTC

(pulsar) branch branch-2.11 updated: [fix][broker] Allow Access to System Topic Metadata for Reader Creation Post-Namespace Deletion (#20304)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 6491a7948ea [fix][broker] Allow Access to System Topic Metadata for Reader Creation Post-Namespace Deletion (#20304)
6491a7948ea is described below

commit 6491a7948ea383ec35ddb2e9d5fc8ffc7a1a2a61
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 12 19:29:42 2023 +0800

    [fix][broker] Allow Access to System Topic Metadata for Reader Creation Post-Namespace Deletion (#20304)
    
    After initiating the snapshot segment function, deletion of topics necessitates the activation of readers. Furthermore, these readers should be opened and deleted as they are used, which implies that we should not pre-store readers. However, after initiating the deletion of namespaces currently, it is not allowed to obtain the metadata of partition topics or lookup, making it impossible to create readers. This results in the inability to delete namespaces.
    
    Allow the acquisition of system topic metadata after initiating namespace deletion, thus creating readers to clean up topic data.
    
    (cherry picked from commit 5d5ec947249df50bf35a78b6a2a0d3b00d97ca66)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 14 ++++++-
 .../pulsar/broker/lookup/TopicLookupBase.java      | 44 ++++++++++++----------
 2 files changed, 37 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 03a18576fe7..6a10a173edd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
 import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
 import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
 import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
@@ -4271,8 +4272,13 @@ public class PersistentTopicsBase extends AdminResource {
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
+        // It is necessary for system topic operations because system topics are used to store metadata
+        // and other vital information. Even after namespace starting deletion,,
+        // we need to access the metadata of system topics to create readers and clean up topic data.
+        // If we don't do this, it can prevent namespace deletion due to inaccessible readers.
         authorizationFuture.thenCompose(__ ->
-                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(),
+                                SystemTopicNames.isSystemTopic(topicName)))
                 .thenCompose(res ->
                         pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
                 .thenAccept(metadata -> {
@@ -4299,7 +4305,11 @@ public class PersistentTopicsBase extends AdminResource {
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
-        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
+        // It is necessary for system topic operations because system topics are used to store metadata
+        // and other vital information. Even after namespace starting deletion,,
+        // we need to access the metadata of system topics to create readers and clean up topic data.
+        // If we don't do this, it can prevent namespace deletion due to inaccessible readers.
+        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName))
             .thenCompose(res -> pulsar.getBrokerService()
                 .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
             .thenAccept(metadata -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 819dc5cb555..7cdcfb3fc28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -228,26 +229,31 @@ public class TopicLookupBase extends PulsarWebResource {
                 // (2) authorize client
                 checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
                         // (3) validate global namespace
+                        // It is necessary for system topic operations because system topics are used to store metadata
+                        // and other vital information. Even after namespace starting deletion,
+                        // we need to access the metadata of system topics to create readers and clean up topic data.
+                        // If we don't do this, it can prevent namespace deletion due to inaccessible readers.
                         checkLocalOrGetPeerReplicationCluster(pulsarService,
-                                topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
-                            if (peerClusterData == null) {
-                                // (4) all validation passed: initiate lookup
-                                validationFuture.complete(null);
-                                return;
-                            }
-                            // if peer-cluster-data is present it means namespace is owned by that peer-cluster and
-                            // request should be redirect to the peer-cluster
-                            if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
-                                    && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
-                                validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService url is not configured",
-                                        requestId));
-                                return;
-                            }
-                            validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
-                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
-                                    requestId,
-                                    false));
+                                topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName))
+                                .thenAccept(peerClusterData -> {
+                                    if (peerClusterData == null) {
+                                        // (4) all validation passed: initiate lookup
+                                        validationFuture.complete(null);
+                                        return;
+                                    }
+                                    // if peer-cluster-data is present it means namespace is owned by that peer-cluster
+                                    // and request should be redirect to the peer-cluster
+                                    if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
+                                            && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
+                                        validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
+                                                "Redirected cluster's brokerService url is not configured",
+                                                requestId));
+                                        return;
+                                    }
+                                    validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
+                                            peerClusterData.getBrokerServiceUrlTls(), true,
+                                            LookupType.Redirect, requestId,
+                                            false));
                         }).exceptionally(ex -> {
                             validationFuture.complete(
                                     newLookupErrorResponse(ServerError.MetadataError,