You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/24 00:39:15 UTC
[pulsar] branch master updated: [Broker] make grantPermissionsOnTopic method async (#14152)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1344b33e411 [Broker] make grantPermissionsOnTopic method async (#14152)
1344b33e411 is described below
commit 1344b33e411e234b55595307dc6c1e3be14b8a55
Author: Dezhi LIiu <33...@users.noreply.github.com>
AuthorDate: Sun Apr 24 08:39:09 2022 +0800
[Broker] make grantPermissionsOnTopic method async (#14152)
---
.../broker/admin/impl/PersistentTopicsBase.java | 95 ++++++++++++----------
.../pulsar/broker/admin/v1/PersistentTopics.java | 18 ++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 11 ++-
.../org/apache/pulsar/broker/admin/AdminTest.java | 6 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 26 +++++-
5 files changed, 101 insertions(+), 55 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index addf02f8005..909c7de8d74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -256,53 +256,62 @@ public class PersistentTopicsBase extends AdminResource {
validateTopicOwnership(topicName, authoritative);
}
- private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
- try {
- AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
- if (null != authService) {
- authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
- } else {
- throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
- }
- log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
- topicUri);
- } catch (InterruptedException e) {
- log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
- throw new RestException(e);
- } catch (ExecutionException e) {
- // The IllegalArgumentException and the IllegalStateException were historically thrown by the
- // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
- if (e.getCause() instanceof MetadataStoreException.NotFoundException
- || e.getCause() instanceof IllegalArgumentException) {
- log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
- topicUri, e);
- throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
- } else if (e.getCause() instanceof MetadataStoreException.BadVersionException
- || e.getCause() instanceof IllegalStateException) {
- log.warn("[{}] Failed to set permissions for topic {}: {}",
- clientAppId(), topicUri, e.getCause().getMessage(), e);
- throw new RestException(Status.CONFLICT, "Concurrent modification");
- } else {
- log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
- throw new RestException(e);
- }
+ private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, String role, Set<AuthAction> actions) {
+ AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
+ .thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
+ clientAppId(), role, actions, topicUri))
+ .exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
+ if (realCause instanceof MetadataStoreException.NotFoundException
+ || realCause instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
+ clientAppId(), topicUri, realCause);
+ throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
+ } else if (realCause instanceof MetadataStoreException.BadVersionException
+ || realCause instanceof IllegalStateException) {
+ log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
+ realCause.getMessage(), realCause);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } else {
+ log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
+ realCause);
+ throw new RestException(realCause);
+ }
+ });
+ } else {
+ String msg = "Authorization is not enabled";
+ return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
}
}
- protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
+ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
+ Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
- validateAdminAccessForTenant(namespaceName.getTenant());
- validatePoliciesReadOnlyAccess();
-
- PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
- int numPartitions = meta.partitions;
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- grantPermissions(topicNamePartition, role, actions);
- }
- }
- grantPermissions(topicName, role, actions);
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+ getPartitionedTopicMetadataAsync(topicName, true, false)
+ .thenCompose(metadata -> {
+ int numPartitions = metadata.partitions;
+ CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+ if (numPartitions > 0) {
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
+ actions));
+ }
+ }
+ return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
+ .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
+ }))).exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
+ resumeAsyncResponseExceptionally(asyncResponse, realCause);
+ return null;
+ });
}
protected void internalDeleteTopicForcefully(boolean authoritative, boolean deleteSchema) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8353756dce0..c24e0c34b40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -122,12 +122,20 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
- public void grantPermissionsOnTopic(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role,
+ public void grantPermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+ @PathParam("role") String role,
Set<AuthAction> actions) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalGrantPermissionsOnTopic(role, actions);
+
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f640b1ca6de..3f12df47f4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -163,6 +163,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void grantPermissionsOnTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -174,8 +175,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Actions to be granted (produce,functions,consume)",
allowableValues = "produce,functions,consume")
Set<AuthAction> actions) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGrantPermissionsOnTopic(role, actions);
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 91ec9adecc7..11bed0583b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -780,7 +780,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// grant permission
final Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce);
final String role = "test-role";
- persistentTopics.grantPermissionsOnTopic(property, cluster, namespace, topic, role, actions);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.grantPermissionsOnTopic(response, property, cluster, namespace, topic, role, actions);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// verify permission
Map<String, Set<AuthAction>> permission = persistentTopics.getPermissionsOnTopic(property, cluster,
namespace, topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 22014fc37e4..6c72e19437f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -623,7 +623,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
- persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
Assert.assertEquals(permissions.get(role), expectActions);
}
@@ -659,7 +663,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
- persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
+ expectActions);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName);
Assert.assertEquals(permissions.get(role), expectActions);
@@ -680,9 +689,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
- persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, topicName, role);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -703,7 +716,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
- persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
+ expectActions);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
responseCaptor = ArgumentCaptor.forClass(Response.class);