You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:56:51 UTC

[pulsar] 01/22: fix delete authentication policies when delete topic. (#12215)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f1d8d7197765277bacf67c8515827c241053223a
Author: Bowen Li <bw...@streamnative.io>
AuthorDate: Wed Oct 27 11:41:44 2021 +0800

    fix delete authentication policies when delete topic. (#12215)
    
    (cherry picked from commit 3e578280539aa55c084a35b601902f0e16c5fc2f)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 99 ++++++++++++++--------
 .../pulsar/broker/service/BrokerService.java       | 76 +++++++++++++++--
 .../broker/service/persistent/PersistentTopic.java | 10 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  4 +-
 .../api/AuthenticatedProducerConsumerTest.java     | 80 +++++++++++++++--
 5 files changed, 213 insertions(+), 56 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 62c01f6..f317e03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -132,6 +132,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -269,7 +270,7 @@ public class PersistentTopicsBase extends AdminResource {
             });
             log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
                     topicUri);
-        } catch (org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+        } catch (MetadataStoreException.NotFoundException e) {
             log.warn("[{}] Failed to grant permissions on topic {}: Namespace does not exist", clientAppId(), topicUri);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (Exception e) {
@@ -588,42 +589,72 @@ public class PersistentTopicsBase extends AdminResource {
                                 }
                             });
                 }
-                for (int i = 0; i < numPartitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        pulsar().getAdminClient().topics()
-                                .deleteAsync(topicNamePartition.toString(), force)
-                                .whenComplete((r, ex) -> {
-                                    if (ex != null) {
-                                        if (ex instanceof NotFoundException) {
-                                            // if the sub-topic is not found, the client might not have called create
-                                            // producer or it might have been deleted earlier,
-                                            //so we ignore the 404 error.
-                                            // For all other exception,
-                                            //we fail the delete partition method even if a single
-                                            // partition is failed to be deleted
-                                            if (log.isDebugEnabled()) {
-                                                log.debug("[{}] Partition not found: {}", clientAppId(),
-                                                        topicNamePartition);
+                // delete authentication policies of the partitioned topic
+                CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>();
+                pulsar().getPulsarResources().getNamespaceResources()
+                        .setPoliciesAsync(topicName.getNamespaceObject(), p -> {
+                            for (int i = 0; i < numPartitions; i++) {
+                                p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
+                            }
+                            p.auth_policies.getTopicAuthentication().remove(topicName.toString());
+                            return p;
+                        }).thenAccept(v -> {
+                            log.info("Successfully delete authentication policies for partitioned topic {}", topicName);
+                            deleteAuthFuture.complete(null);
+                        }).exceptionally(ex -> {
+                            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
+                                log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
+                                deleteAuthFuture.complete(null);
+                            } else {
+                                log.error("Failed to delete authentication policies for partitioned topic {}",
+                                        topicName, ex);
+                                deleteAuthFuture.completeExceptionally(ex);
+                            }
+                            return null;
+                        });
+
+                deleteAuthFuture.whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(ex);
+                        return;
+                    }
+                    for (int i = 0; i < numPartitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            pulsar().getAdminClient().topics()
+                                    .deleteAsync(topicNamePartition.toString(), force)
+                                    .whenComplete((r1, ex1) -> {
+                                        if (ex1 != null) {
+                                            if (ex1 instanceof NotFoundException) {
+                                                // if the sub-topic is not found, the client might not have called
+                                                // create producer or it might have been deleted earlier,
+                                                //so we ignore the 404 error.
+                                                // For all other exception,
+                                                //we fail the delete partition method even if a single
+                                                // partition is failed to be deleted
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("[{}] Partition not found: {}", clientAppId(),
+                                                            topicNamePartition);
+                                                }
+                                            } else {
+                                                log.error("[{}] Failed to delete partition {}", clientAppId(),
+                                                        topicNamePartition, ex1);
+                                                future.completeExceptionally(ex1);
+                                                return;
                                             }
                                         } else {
-                                            log.error("[{}] Failed to delete partition {}", clientAppId(),
-                                                    topicNamePartition, ex);
-                                            future.completeExceptionally(ex);
-                                            return;
+                                            log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
                                         }
-                                    } else {
-                                        log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
-                                    }
-                                    if (count.decrementAndGet() == 0) {
-                                        future.complete(null);
-                                    }
-                                });
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
-                        future.completeExceptionally(e);
+                                        if (count.decrementAndGet() == 0) {
+                                            future.complete(null);
+                                        }
+                                    });
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
+                            future.completeExceptionally(e);
+                        }
                     }
-                }
+                });
             } else {
                 future.complete(null);
             }
@@ -2654,7 +2685,7 @@ public class PersistentTopicsBase extends AdminResource {
         // note that we do not want to load the topic and hence skip authorization check
         try {
             namespaceResources().getPolicies(namespaceName);
-        } catch (org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+        } catch (MetadataStoreException.NotFoundException e) {
             log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0835eb0..76c295b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -971,6 +971,9 @@ public class BrokerService implements Closeable {
             }
         }
 
+        if (log.isDebugEnabled()) {
+            log.debug("Topic {} is not loaded, try to delete from metadata", topic);
+        }
         // Topic is not loaded, though we still might be able to delete from metadata
         TopicName tn = TopicName.get(topic);
         if (!tn.isPersistent()) {
@@ -979,21 +982,76 @@ public class BrokerService implements Closeable {
         }
 
         CompletableFuture<Void> future = new CompletableFuture<>();
-        managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() {
-            @Override
-            public void deleteLedgerComplete(Object ctx) {
-                future.complete(null);
-            }
 
-            @Override
-            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                future.completeExceptionally(exception);
+        CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
+        deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
+        deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+            if (ex != null) {
+                future.completeExceptionally(ex);
+                return;
             }
-        }, null);
+            managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() {
+                @Override
+                public void deleteLedgerComplete(Object ctx) {
+                    future.complete(null);
+                }
+
+                @Override
+                public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                    future.completeExceptionally(exception);
+                }
+            }, null);
+        });
+
 
         return future;
     }
 
+    public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<Void> future, int count) {
+        if (count == 0) {
+            log.error("The number of retries has exhausted for topic {}", topic);
+            future.completeExceptionally(new MetadataStoreException("The number of retries has exhausted"));
+            return;
+        }
+        NamespaceName namespaceName = TopicName.get(topic).getNamespaceObject();
+        // Check whether there are auth policies for the topic
+        pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies -> {
+            if (!optPolicies.isPresent() || !optPolicies.get().auth_policies.getTopicAuthentication()
+                    .containsKey(topic)) {
+                // if there is no auth policy for the topic, just complete and return
+                if (log.isDebugEnabled()) {
+                    log.debug("Authentication policies not found for topic {}", topic);
+                }
+                future.complete(null);
+                return;
+            }
+            pulsar.getPulsarResources().getNamespaceResources()
+                    .setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> {
+                        p.auth_policies.getTopicAuthentication().remove(topic);
+                        return p;
+                    }).thenAccept(v -> {
+                        log.info("Successfully delete authentication policies for topic {}", topic);
+                        future.complete(null);
+                    }).exceptionally(ex1 -> {
+                        if (ex1.getCause() instanceof MetadataStoreException.BadVersionException) {
+                            log.warn(
+                                    "Failed to delete authentication policies because of bad version. "
+                                            + "Retry to delete authentication policies for topic {}",
+                                    topic);
+                            deleteTopicAuthenticationWithRetry(topic, future, count - 1);
+                        } else {
+                            log.error("Failed to delete authentication policies for topic {}", topic, ex1);
+                            future.completeExceptionally(ex1);
+                        }
+                        return null;
+                    });
+        }).exceptionally(ex -> {
+            log.error("Failed to get policies for topic {}", topic, ex);
+            future.completeExceptionally(ex);
+            return null;
+        });
+    }
+
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
         CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f74060c..962edb7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -145,7 +145,6 @@ import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -1122,10 +1121,12 @@ public class PersistentTopic extends AbstractTopic
                 //  2. We want to kick out everyone and forcefully delete the topic.
                 //     In this case, we shouldn't care if the usageCount is 0 or not, just proceed
                 if (currentUsageCount() ==  0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
-                    CompletableFuture<SchemaVersion> deleteSchemaFuture =
-                            deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);
+                    CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
+                    brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
 
-                    deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies())
+                    deleteTopicAuthenticationFuture.thenCompose(
+                                    __ -> deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null))
+                            .thenAccept(__ -> deleteTopicPolicies())
                             .thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Error deleting topic", topic, ex);
@@ -1152,6 +1153,7 @@ public class PersistentTopic extends AbstractTopic
 
                                             brokerService.pulsar().getTopicPoliciesService()
                                                     .clean(TopicName.get(topic));
+
                                             log.info("[{}] Topic deleted", topic);
                                             deleteFuture.complete(null);
                                         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7e1c576..2c3f704 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -782,9 +782,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         }
 
         // Force topic creation and namespace being loaded
-        producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
+        producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/ns2/my-topic").create();
         producer.close();
-        admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
+        admin.topics().delete("persistent://prop-xyz/ns2/my-topic");
 
         // both unload and delete should succeed for ns2 on other broker with a redirect
         // otheradmin.namespaces().unload("prop-xyz/use/ns2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 6d76ce8..046b268 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import static org.mockito.Mockito.spy;
-
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,20 +30,20 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import javax.ws.rs.InternalServerErrorException;
-
 import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.zookeeper.KeeperException.Code;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -49,8 +52,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
 @Test(groups = "broker-api")
 public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);
@@ -87,9 +88,10 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         superUserRoles.add("admin");
         conf.setSuperUserRoles(superUserRoles);
 
+        conf.setBrokerClientTlsEnabled(true);
         conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         conf.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
@@ -337,4 +339,68 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         mockZooKeeperGlobal.unsetAlwaysFail();
     }
 
+    @Test
+    public void testDeleteAuthenticationPoliciesOfTopic() throws Exception {
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        internalSetup(authTls);
+
+        admin.clusters().createCluster("test", ClusterData.builder().build());
+        admin.tenants().createTenant("p1",
+                new TenantInfoImpl(Collections.emptySet(), new HashSet<>(admin.clusters().getClusters())));
+        admin.namespaces().createNamespace("p1/ns1");
+
+        // test for non-partitioned topic
+        String topic = "persistent://p1/ns1/topic";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().grantPermission(topic, "test-user", EnumSet.of(AuthAction.consume));
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    .get().auth_policies.getTopicAuthentication().containsKey(topic));
+        });
+
+        admin.topics().delete(topic);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    .get().auth_policies.getTopicAuthentication().containsKey(topic));
+        });
+
+        // test for partitioned topic
+        String partitionedTopic = "persistent://p1/ns1/partitioned-topic";
+        int numPartitions = 5;
+
+        admin.topics().createPartitionedTopic(partitionedTopic, numPartitions);
+        admin.topics()
+                .grantPermission(partitionedTopic, "test-user", EnumSet.of(AuthAction.consume));
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
+            for (int i = 0; i < numPartitions; i++) {
+                assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                        .get().auth_policies.getTopicAuthentication()
+                        .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
+            }
+        });
+
+        admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
+            for (int i = 0; i < numPartitions; i++) {
+                assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                        .get().auth_policies.getTopicAuthentication()
+                        .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
+            }
+        });
+
+        admin.namespaces().deleteNamespace("p1/ns1");
+        admin.tenants().deleteTenant("p1");
+        admin.clusters().deleteCluster("test");
+    }
 }