You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:56:51 UTC
[pulsar] 01/22: fix delete authentication policies when delete topic. (#12215)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f1d8d7197765277bacf67c8515827c241053223a
Author: Bowen Li <bw...@streamnative.io>
AuthorDate: Wed Oct 27 11:41:44 2021 +0800
fix delete authentication policies when delete topic. (#12215)
(cherry picked from commit 3e578280539aa55c084a35b601902f0e16c5fc2f)
---
.../broker/admin/impl/PersistentTopicsBase.java | 99 ++++++++++++++--------
.../pulsar/broker/service/BrokerService.java | 76 +++++++++++++++--
.../broker/service/persistent/PersistentTopic.java | 10 ++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 4 +-
.../api/AuthenticatedProducerConsumerTest.java | 80 +++++++++++++++--
5 files changed, 213 insertions(+), 56 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 62c01f6..f317e03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -132,6 +132,7 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,7 +270,7 @@ public class PersistentTopicsBase extends AdminResource {
});
log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
topicUri);
- } catch (org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+ } catch (MetadataStoreException.NotFoundException e) {
log.warn("[{}] Failed to grant permissions on topic {}: Namespace does not exist", clientAppId(), topicUri);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (Exception e) {
@@ -588,42 +589,72 @@ public class PersistentTopicsBase extends AdminResource {
}
});
}
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- pulsar().getAdminClient().topics()
- .deleteAsync(topicNamePartition.toString(), force)
- .whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof NotFoundException) {
- // if the sub-topic is not found, the client might not have called create
- // producer or it might have been deleted earlier,
- //so we ignore the 404 error.
- // For all other exception,
- //we fail the delete partition method even if a single
- // partition is failed to be deleted
- if (log.isDebugEnabled()) {
- log.debug("[{}] Partition not found: {}", clientAppId(),
- topicNamePartition);
+ // delete authentication policies of the partitioned topic
+ CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>();
+ pulsar().getPulsarResources().getNamespaceResources()
+ .setPoliciesAsync(topicName.getNamespaceObject(), p -> {
+ for (int i = 0; i < numPartitions; i++) {
+ p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
+ }
+ p.auth_policies.getTopicAuthentication().remove(topicName.toString());
+ return p;
+ }).thenAccept(v -> {
+ log.info("Successfully delete authentication policies for partitioned topic {}", topicName);
+ deleteAuthFuture.complete(null);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
+ log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
+ deleteAuthFuture.complete(null);
+ } else {
+ log.error("Failed to delete authentication policies for partitioned topic {}",
+ topicName, ex);
+ deleteAuthFuture.completeExceptionally(ex);
+ }
+ return null;
+ });
+
+ deleteAuthFuture.whenComplete((r, ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ return;
+ }
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ pulsar().getAdminClient().topics()
+ .deleteAsync(topicNamePartition.toString(), force)
+ .whenComplete((r1, ex1) -> {
+ if (ex1 != null) {
+ if (ex1 instanceof NotFoundException) {
+ // if the sub-topic is not found, the client might not have called
+ // create producer or it might have been deleted earlier,
+ //so we ignore the 404 error.
+ // For all other exception,
+ //we fail the delete partition method even if a single
+ // partition is failed to be deleted
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Partition not found: {}", clientAppId(),
+ topicNamePartition);
+ }
+ } else {
+ log.error("[{}] Failed to delete partition {}", clientAppId(),
+ topicNamePartition, ex1);
+ future.completeExceptionally(ex1);
+ return;
}
} else {
- log.error("[{}] Failed to delete partition {}", clientAppId(),
- topicNamePartition, ex);
- future.completeExceptionally(ex);
- return;
+ log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
}
- } else {
- log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
- }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
- });
- } catch (Exception e) {
- log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
- future.completeExceptionally(e);
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
+ });
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
+ future.completeExceptionally(e);
+ }
}
- }
+ });
} else {
future.complete(null);
}
@@ -2654,7 +2685,7 @@ public class PersistentTopicsBase extends AdminResource {
// note that we do not want to load the topic and hence skip authorization check
try {
namespaceResources().getPolicies(namespaceName);
- } catch (org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+ } catch (MetadataStoreException.NotFoundException e) {
log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0835eb0..76c295b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -971,6 +971,9 @@ public class BrokerService implements Closeable {
}
}
+ if (log.isDebugEnabled()) {
+ log.debug("Topic {} is not loaded, try to delete from metadata", topic);
+ }
// Topic is not loaded, though we still might be able to delete from metadata
TopicName tn = TopicName.get(topic);
if (!tn.isPersistent()) {
@@ -979,21 +982,76 @@ public class BrokerService implements Closeable {
}
CompletableFuture<Void> future = new CompletableFuture<>();
- managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- future.complete(null);
- }
- @Override
- public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- future.completeExceptionally(exception);
+ CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
+ deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
+ deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ return;
}
- }, null);
+ managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ future.complete(null);
+ }
+
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ });
+
return future;
}
+ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<Void> future, int count) {
+ if (count == 0) {
+ log.error("The number of retries has exhausted for topic {}", topic);
+ future.completeExceptionally(new MetadataStoreException("The number of retries has exhausted"));
+ return;
+ }
+ NamespaceName namespaceName = TopicName.get(topic).getNamespaceObject();
+ // Check whether there are auth policies for the topic
+ pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies -> {
+ if (!optPolicies.isPresent() || !optPolicies.get().auth_policies.getTopicAuthentication()
+ .containsKey(topic)) {
+ // if there is no auth policy for the topic, just complete and return
+ if (log.isDebugEnabled()) {
+ log.debug("Authentication policies not found for topic {}", topic);
+ }
+ future.complete(null);
+ return;
+ }
+ pulsar.getPulsarResources().getNamespaceResources()
+ .setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> {
+ p.auth_policies.getTopicAuthentication().remove(topic);
+ return p;
+ }).thenAccept(v -> {
+ log.info("Successfully delete authentication policies for topic {}", topic);
+ future.complete(null);
+ }).exceptionally(ex1 -> {
+ if (ex1.getCause() instanceof MetadataStoreException.BadVersionException) {
+ log.warn(
+ "Failed to delete authentication policies because of bad version. "
+ + "Retry to delete authentication policies for topic {}",
+ topic);
+ deleteTopicAuthenticationWithRetry(topic, future, count - 1);
+ } else {
+ log.error("Failed to delete authentication policies for topic {}", topic, ex1);
+ future.completeExceptionally(ex1);
+ }
+ return null;
+ });
+ }).exceptionally(ex -> {
+ log.error("Failed to get policies for topic {}", topic, ex);
+ future.completeExceptionally(ex);
+ return null;
+ });
+ }
+
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f74060c..962edb7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -145,7 +145,6 @@ import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
@@ -1122,10 +1121,12 @@ public class PersistentTopic extends AbstractTopic
// 2. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (currentUsageCount() == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
- CompletableFuture<SchemaVersion> deleteSchemaFuture =
- deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);
+ CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
+ brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
- deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies())
+ deleteTopicAuthenticationFuture.thenCompose(
+ __ -> deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null))
+ .thenAccept(__ -> deleteTopicPolicies())
.thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
@@ -1152,6 +1153,7 @@ public class PersistentTopic extends AbstractTopic
brokerService.pulsar().getTopicPoliciesService()
.clean(TopicName.get(topic));
+
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7e1c576..2c3f704 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -782,9 +782,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
}
// Force topic creation and namespace being loaded
- producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
+ producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/ns2/my-topic").create();
producer.close();
- admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
+ admin.topics().delete("persistent://prop-xyz/ns2/my-topic");
// both unload and delete should succeed for ns2 on other broker with a redirect
// otheradmin.namespaces().unload("prop-xyz/use/ns2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 6d76ce8..046b268 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.client.api;
import static org.mockito.Mockito.spy;
-
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,20 +30,20 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import javax.ws.rs.InternalServerErrorException;
-
import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.zookeeper.KeeperException.Code;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -49,8 +52,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
-
@Test(groups = "broker-api")
public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);
@@ -87,9 +88,10 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
superUserRoles.add("admin");
conf.setSuperUserRoles(superUserRoles);
+ conf.setBrokerClientTlsEnabled(true);
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
conf.setBrokerClientAuthenticationParameters(
- "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+ "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
@@ -337,4 +339,68 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
mockZooKeeperGlobal.unsetAlwaysFail();
}
+ @Test
+ public void testDeleteAuthenticationPoliciesOfTopic() throws Exception {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
+ internalSetup(authTls);
+
+ admin.clusters().createCluster("test", ClusterData.builder().build());
+ admin.tenants().createTenant("p1",
+ new TenantInfoImpl(Collections.emptySet(), new HashSet<>(admin.clusters().getClusters())));
+ admin.namespaces().createNamespace("p1/ns1");
+
+ // test for non-partitioned topic
+ String topic = "persistent://p1/ns1/topic";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().grantPermission(topic, "test-user", EnumSet.of(AuthAction.consume));
+
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+ .get().auth_policies.getTopicAuthentication().containsKey(topic));
+ });
+
+ admin.topics().delete(topic);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+ .get().auth_policies.getTopicAuthentication().containsKey(topic));
+ });
+
+ // test for partitioned topic
+ String partitionedTopic = "persistent://p1/ns1/partitioned-topic";
+ int numPartitions = 5;
+
+ admin.topics().createPartitionedTopic(partitionedTopic, numPartitions);
+ admin.topics()
+ .grantPermission(partitionedTopic, "test-user", EnumSet.of(AuthAction.consume));
+
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+ .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
+ for (int i = 0; i < numPartitions; i++) {
+ assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+ .get().auth_policies.getTopicAuthentication()
+ .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
+ }
+ });
+
+ admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+ .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
+ for (int i = 0; i < numPartitions; i++) {
+ assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+ .get().auth_policies.getTopicAuthentication()
+ .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
+ }
+ });
+
+ admin.namespaces().deleteNamespace("p1/ns1");
+ admin.tenants().deleteTenant("p1");
+ admin.clusters().deleteCluster("test");
+ }
}