You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/05 09:56:31 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] fix can not revoke permission after update topic partition (#17393)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 2d0190f9e65 [fix][broker] fix can not revoke permission after update topic partition (#17393)
2d0190f9e65 is described below
commit 2d0190f9e659650aa385f945ff09a882411e324c
Author: ken <16...@qq.com>
AuthorDate: Mon Sep 5 10:59:08 2022 +0800
[fix][broker] fix can not revoke permission after update topic partition (#17393)
* add unittest for revoke permission of topic after update topic partition
* fix revoke permission of partition
Co-authored-by: fanjianye <fa...@bigo.sg>
(cherry picked from commit 8c4aad5d2d5811f5eef220692a62d38a70d52988)
---
.../broker/admin/impl/PersistentTopicsBase.java | 14 +++++++++-----
.../server/ProxyWithJwtAuthorizationTest.java | 22 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f0f8e6a17b1..dbcb44a5f38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -320,7 +320,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
+ private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role, boolean force) {
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
@@ -329,8 +329,12 @@ public class PersistentTopicsBase extends AdminResource {
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
clientAppId(), role, topicUri);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
- "Permissions are not set at the topic level"));
+ if (force) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+ "Permissions are not set at the topic level"));
+ }
}
// Write the new policies to metadata store
return namespaceResources().setPoliciesAsync(namespaceName, p -> {
@@ -356,10 +360,10 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused ->
- revokePermissionsAsync(topicNamePartition.toString(), role));
+ revokePermissionsAsync(topicNamePartition.toString(), role, true));
}
}
- return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
+ return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role, false))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))
).exceptionally(ex -> {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index ff8f45d50f8..07b71530b53 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -209,6 +209,8 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
* 2. Update the topic partition number to 4.
* 3. Use new producer/consumer with client role to process the topic.
* 4. Broker should authorize producer/consumer normally.
+ * 5. revoke produce/consumer permission of topic
+ * 6. new producer/consumer should not be authorized
* </pre>
*/
@Test
@@ -290,6 +292,26 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
Assert.assertEquals(messageSet, receivedMessageSet);
consumer.close();
producer.close();
+
+ // revoke produce/consume permission
+ admin.topics().revokePermissions(topicName, CLIENT_ROLE);
+
+ // produce/consume the topic should fail
+ try {
+ consumer = proxyClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName).subscribe();
+ Assert.fail("Should not pass");
+ } catch (PulsarClientException.AuthorizationException ex) {
+ // ok
+ }
+ try {
+ producer = proxyClient.newProducer(Schema.BYTES)
+ .topic(topicName).create();
+ Assert.fail("Should not pass");
+ } catch (PulsarClientException.AuthorizationException ex) {
+ // ok
+ }
log.info("-- Exiting {} test --", methodName);
}