You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 13:24:53 UTC

[pulsar] branch branch-2.10 updated (a1d327e5378 -> 7c88bd1b20b)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from a1d327e5378 [fix] [admin] fix reach max tenants error if the tenant already exists (#15961)
     new 4a82c5be8f2 [cleanup][function] refine file io connector (#15250)
     new a7f3efa43ab [Broker]make revokePermissionsOnTopic method async (#14149)
     new 7c88bd1b20b [fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/authorization/AuthorizationService.java |  20 ++-
 .../broker/admin/impl/PersistentTopicsBase.java    | 171 ++++++++++----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  17 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  11 +-
 .../pulsar/broker/lookup/TopicLookupBase.java      |  54 ++++---
 .../pulsar/broker/web/PulsarWebResource.java       | 173 +++++++++++++--------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   8 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  13 +-
 .../apache/pulsar/io/file/FileListingThread.java   |   6 +-
 .../apache/pulsar/io/file/FileSourceConfig.java    |   4 +-
 .../pulsar/io/file/FileSourceConfigTests.java      |  44 +++++-
 11 files changed, 325 insertions(+), 196 deletions(-)


[pulsar] 03/03: [fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c88bd1b20bb57bb420b9eb4f30946afcc1d83cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 9 09:06:28 2022 +0800

    [fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)
    
    (cherry picked from commit 6af365e36aed74e95ca6e088f453d9513094bb36)
---
 .../broker/authorization/AuthorizationService.java | 20 +++--
 .../broker/admin/impl/PersistentTopicsBase.java    | 88 ++++++++++++----------
 .../pulsar/broker/lookup/TopicLookupBase.java      | 54 +++++++------
 .../pulsar/broker/web/PulsarWebResource.java       | 19 +++--
 4 files changed, 101 insertions(+), 80 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 23f650678e8..3baaf57990a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -357,10 +357,11 @@ public class AuthorizationService {
                                         TenantOperation operation,
                                         String originalRole,
                                         String role,
-                                        AuthenticationDataSource authData) {
+                                        AuthenticationDataSource authData) throws Exception {
         try {
             return allowTenantOperationAsync(
-                    tenantName, operation, originalRole, role, authData).get();
+                    tenantName, operation, originalRole, role, authData).get(
+                            conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -455,10 +456,11 @@ public class AuthorizationService {
                                                  PolicyOperation operation,
                                                  String originalRole,
                                                  String role,
-                                                 AuthenticationDataSource authData) {
+                                                 AuthenticationDataSource authData) throws Exception {
         try {
             return allowNamespacePolicyOperationAsync(
-                    namespaceName, policy, operation, originalRole, role, authData).get();
+                    namespaceName, policy, operation, originalRole, role, authData).get(
+                            conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -516,10 +518,11 @@ public class AuthorizationService {
                                              PolicyOperation operation,
                                              String originalRole,
                                              String role,
-                                             AuthenticationDataSource authData) {
+                                             AuthenticationDataSource authData) throws Exception {
         try {
             return allowTopicPolicyOperationAsync(
-                    topicName, policy, operation, originalRole, role, authData).get();
+                    topicName, policy, operation, originalRole, role, authData).get(
+                            conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -596,9 +599,10 @@ public class AuthorizationService {
                                        TopicOperation operation,
                                        String originalRole,
                                        String role,
-                                       AuthenticationDataSource authData) {
+                                       AuthenticationDataSource authData) throws Exception {
         try {
-            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
+            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
+                    conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index baab14e88e1..9c553f273ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -3998,46 +3997,55 @@ public class PersistentTopicsBase extends AdminResource {
             PulsarService pulsar, String clientAppId, String originalPrincipal,
             AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
-        try {
-            // (1) authorize client
-            try {
-                checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
-            } catch (RestException e) {
-                try {
-                    validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
-                            pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
-                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
-                            clientAppId, topicName, authException.getMessage()));
-                }
-            } catch (Exception ex) {
-                // throw without wrapping to PulsarClientException that considers: unknown error marked as internal
-                // server error
-                log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, ex);
-                throw ex;
-            }
+        CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
+        checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
+                .thenRun(() -> authorizationFuture.complete(null))
+                .exceptionally(e -> {
+                    Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                    if (throwable instanceof RestException) {
+                        validateAdminAccessForTenantAsync(pulsar,
+                                clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
+                                .thenRun(() -> {
+                                    authorizationFuture.complete(null);
+                                }).exceptionally(ex -> {
+                                    Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
+                                    if (throwable2 instanceof RestException) {
+                                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
+                                        authorizationFuture.completeExceptionally(new PulsarClientException(
+                                                String.format("Authorization failed %s on topic %s with error %s",
+                                                clientAppId, topicName, throwable2.getMessage())));
+                                    } else {
+                                        authorizationFuture.completeExceptionally(throwable2);
+                                    }
+                                    return null;
+                                });
+                    } else {
+                        // throw without wrapping to PulsarClientException that considers: unknown error marked as
+                        // internal server error
+                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
+                        authorizationFuture.completeExceptionally(throwable);
+                    }
+                    return null;
+                });
 
-            // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
-            // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
-            // producer/consumer
-            checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
-                    .thenCompose(res -> pulsar.getBrokerService()
-                            .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
-                    .thenAccept(metadata -> {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
-                                    metadata.partitions);
-                        }
-                        metadataFuture.complete(metadata);
-                    }).exceptionally(ex -> {
-                        metadataFuture.completeExceptionally(ex.getCause());
-                        return null;
-                    });
-        } catch (Exception ex) {
-            metadataFuture.completeExceptionally(ex);
-        }
+        // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+        // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+        // producer/consumer
+        authorizationFuture.thenCompose(__ ->
+                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+                .thenCompose(res ->
+                        pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+                .thenAccept(metadata -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
+                                metadata.partitions);
+                    }
+                    metadataFuture.complete(metadata);
+                })
+                .exceptionally(e -> {
+                    metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+                    return null;
+                });
         return metadataFuture;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index c8ca671f317..ae1d2a5bab0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -236,24 +237,14 @@ public class TopicLookupBase extends PulsarWebResource {
                             cluster);
                 }
                 validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
-                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
+                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                        requestId, false));
             } else {
                 // (2) authorize client
-                try {
-                    checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
-                            authException.getMessage(), requestId));
-                    return;
-                } catch (Exception e) {
-                    log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.completeExceptionally(e);
-                    return;
-                }
-                // (3) validate global namespace
-                checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
-                        .thenAccept(peerClusterData -> {
+                checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
+                        // (3) validate global namespace
+                        checkLocalOrGetPeerReplicationCluster(pulsarService,
+                                topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
                             if (peerClusterData == null) {
                                 // (4) all validation passed: initiate lookup
                                 validationFuture.complete(null);
@@ -264,21 +255,36 @@ public class TopicLookupBase extends PulsarWebResource {
                             if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
                                     && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
                                 validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService url is not configured", requestId));
+                                        "Redirected cluster's brokerService url is not configured",
+                                        requestId));
                                 return;
                             }
                             validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
-                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
+                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                                    requestId,
                                     false));
-
                         }).exceptionally(ex -> {
-                    validationFuture.complete(
-                            newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
-                    return null;
-                });
+                            validationFuture.complete(
+                                    newLookupErrorResponse(ServerError.MetadataError,
+                                            FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
+                            return null;
+                        });
+                    })
+                    .exceptionally(e -> {
+                        Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                        if (throwable instanceof RestException) {
+                            log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
+                            validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+                                    throwable.getMessage(), requestId));
+                        } else {
+                            log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
+                            validationFuture.completeExceptionally(throwable);
+                        }
+                        return null;
+                    });
             }
         }).exceptionally(ex -> {
-            validationFuture.completeExceptionally(ex);
+            validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5f7fb8fad41..d810de85bf4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -891,18 +891,21 @@ public abstract class PulsarWebResource {
         return null;
     }
 
-    protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,
-            AuthenticationDataSource authenticationData) throws Exception {
+    protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
+                        String role, AuthenticationDataSource authenticationData) {
         if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
             // No enforcing of authorization policies
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         // get zk policy manager
-        if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
-                TopicOperation.LOOKUP, null, role, authenticationData)) {
-            log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
-            throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
-        }
+        return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
+                TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
+                    if (!allow) {
+                        log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
+                        throw new RestException(Status.UNAUTHORIZED,
+                                "Don't have permission to connect to this namespace");
+                    }
+        });
     }
 
     // Used for unit tests access


[pulsar] 01/03: [cleanup][function] refine file io connector (#15250)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4a82c5be8f258645899e6ecafed6e95064c7f040
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 04:49:53 2022 -0700

    [cleanup][function] refine file io connector (#15250)
    
    (cherry picked from commit cbefe3ed9b907e0cf1bed2a16f26055dc23026b0)
---
 .../apache/pulsar/io/file/FileListingThread.java   |  6 +--
 .../apache/pulsar/io/file/FileSourceConfig.java    |  4 +-
 .../pulsar/io/file/FileSourceConfigTests.java      | 44 ++++++++++++++++++----
 3 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
index 4d35682a0cd..dac8f45754c 100644
--- a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
@@ -107,10 +107,10 @@ public class FileListingThread extends Thread {
     private Set<File> performListing(final File directory, final FileFilter filter,
             final boolean recurseSubdirectories) {
         Path p = directory.toPath();
-        if (!Files.isWritable(p) || !Files.isReadable(p)) {
-            throw new IllegalStateException("Directory '" + directory
-                    + "' does not have sufficient permissions (i.e., not writable and readable)");
+        if (!Files.isReadable(p)) {
+            throw new IllegalStateException("Cannot read directory: '" + directory);
         }
+
         final Set<File> queue = new HashSet<>();
         if (!directory.exists()) {
             return queue;
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
index 92d791dac8a..5290c87a783 100644
--- a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
@@ -133,7 +133,7 @@ public class FileSourceConfig implements Serializable {
             throw new IllegalArgumentException("Specified input directory does not exist");
         } else if (!Files.isReadable(Paths.get(inputDirectory))) {
             throw new IllegalArgumentException("Specified input directory is not readable");
-        } else if (Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) {
+        } else if (!Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) {
             throw new IllegalArgumentException("You have requested the consumed files to be deleted, but the "
                     + "source directory is not writeable.");
         }
@@ -175,4 +175,4 @@ public class FileSourceConfig implements Serializable {
                     "The property keepFile must be false if the property processedFileSuffix is set");
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
index 64144e667ad..4a4d8d2a867 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.io.file;
 
+import static org.junit.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -29,6 +31,8 @@ import org.testng.annotations.Test;
 
 public class FileSourceConfigTests {
 
+    private final static String INPUT_DIRECTORY = "/dev/null";
+
     @Test
     public final void loadFromYamlFileTest() throws IOException {
         File yamlFile = getFile("sinkConfig.yaml");
@@ -39,7 +43,7 @@ public class FileSourceConfigTests {
     @Test
     public final void loadFromMapTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/tmp");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("keepFile", false);
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -49,7 +53,7 @@ public class FileSourceConfigTests {
     @Test
     public final void validValidateTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/tmp");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         
         FileSourceConfig config = FileSourceConfig.load(map);
         assertNotNull(config);
@@ -70,7 +74,7 @@ public class FileSourceConfigTests {
     @Test(expectedExceptions = com.fasterxml.jackson.databind.exc.InvalidFormatException.class)
     public final void InvalidBooleanPropertyTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("recurse", "not a boolean");
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -82,7 +86,7 @@ public class FileSourceConfigTests {
             expectedExceptionsMessageRegExp = "The property pollingInterval must be greater than zero")
     public final void ZeroValueTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("pollingInterval", 0);
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -94,7 +98,7 @@ public class FileSourceConfigTests {
             expectedExceptionsMessageRegExp = "The property minimumFileAge must be non-negative")
     public final void NegativeValueTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("minimumFileAge", "-50");
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -106,14 +110,40 @@ public class FileSourceConfigTests {
             expectedExceptionsMessageRegExp = "Invalid Regex pattern provided for fileFilter")
     public final void invalidFileFilterTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("fileFilter", "\\");  // Results in a single '\' being sent.
         
         FileSourceConfig config = FileSourceConfig.load(map);
         assertNotNull(config);
         config.validate();
     }
-    
+
+    @Test
+    public final void keepFileTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/"); // root directory that we cannot write to
+        map.put("keepFile", "true"); // even though no write permission on "/", we should still be able to read
+
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        assertTrue(config.getKeepFile());
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "You have requested the consumed files to be deleted, " +
+                    "but the source directory is not writeable.")
+    public final void invalidKeepFileTest() throws  IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/"); // root directory that we cannot write to
+        map.put("keepFile", "false");
+
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        assertFalse(config.getKeepFile());
+        config.validate();
+    }
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());


[pulsar] 02/03: [Broker]make revokePermissionsOnTopic method async (#14149)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a7f3efa43abe414afd7f986d77ecb20aa47b279b
Author: Dezhi LIiu <33...@users.noreply.github.com>
AuthorDate: Wed Apr 20 11:24:14 2022 +0800

    [Broker]make revokePermissionsOnTopic method async (#14149)
    
    (cherry picked from commit d7ddda811437096b857bffff7d080a1c555f54d8)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  89 ++++++------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  17 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  11 +-
 .../pulsar/broker/web/PulsarWebResource.java       | 154 +++++++++++++--------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   8 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  13 +-
 6 files changed, 185 insertions(+), 107 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7bdefb29098..baab14e88e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -320,49 +321,54 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
-        Policies policies;
-        try {
-            policies = namespaceResources().getPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-        if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
-                || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
-            log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
-                    role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-
+    private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
+        return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
+                policiesOptional -> {
+                    Policies policies = policiesOptional.orElseThrow(() ->
+                            new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                    if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
+                            || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
+                        log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
+                                clientAppId(), role, topicUri);
+                        return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                                "Permissions are not set at the topic level"));
+                    }
+                    // Write the new policies to metadata store
+                    return namespaceResources().setPoliciesAsync(namespaceName, p -> {
+                        p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+                        return p;
+                    }).thenAccept(__ ->
+                            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
+                            topicUri)
+                    );
+                }
+        );
     }
 
-    protected void internalRevokePermissionsOnTopic(String role) {
+    protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                revokePermissions(topicNamePartition.toString(), role);
-            }
-        }
-        revokePermissions(topicName.toString(), role);
+        validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+            getPartitionedTopicMetadataAsync(topicName, true, false)
+                .thenCompose(metadata -> {
+                    int numPartitions = metadata.partitions;
+                    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                    if (numPartitions > 0) {
+                        for (int i = 0; i < numPartitions; i++) {
+                            TopicName topicNamePartition = topicName.getPartition(i);
+                            future = future.thenComposeAsync(unused ->
+                                    revokePermissionsAsync(topicNamePartition.toString(), role));
+                        }
+                    }
+                    return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
+                            .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
+                }))
+            ).exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicName, realCause);
+                    resumeAsyncResponseExceptionally(asyncResponse, realCause);
+                    return null;
+                });
     }
 
     protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
@@ -3999,7 +4005,8 @@ public class PersistentTopicsBase extends AdminResource {
             } catch (RestException e) {
                 try {
                     validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
+                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
+                            pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
                     throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 283ba0ebccd..657f73824aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -141,11 +141,18 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Permissions are not set at the topic level")})
-    public void revokePermissionsOnTopic(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+    public void revokePermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("role") String role) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalRevokePermissionsOnTopic(asyncResponse, role);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c0f69f3270c..3f1350105ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -192,6 +192,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Permissions are not set at the topic level"),
             @ApiResponse(code = 500, message = "Internal server error")})
     public void revokePermissionsOnTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -200,8 +201,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Client role to which grant permissions", required = true)
             @PathParam("role") String role) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalRevokePermissionsOnTopic(asyncResponse, role);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d70934b08c7..5f7fb8fad41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -258,7 +259,8 @@ public abstract class PulsarWebResource {
      */
     protected void validateAdminAccessForTenant(String tenant) {
         try {
-            validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData());
+            validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData(),
+                    config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
@@ -268,65 +270,109 @@ public abstract class PulsarWebResource {
     }
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
-                                                       String originalPrincipal, String tenant,
-                                                       AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                String originalPrincipal, String tenant,
+                                                AuthenticationDataSource authenticationData,
+                                                long timeout, TimeUnit unit) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, originalPrincipal, tenant, authenticationData)
+                    .get(timeout, unit);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            Throwable realCause = FutureUtil.unwrapCompletionException(e);
+            if (realCause instanceof WebApplicationException) {
+                throw (WebApplicationException) realCause;
+            } else {
+                throw new RestException(realCause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
-
-        TenantInfo tenantInfo = pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant does not exist"));
-
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId)) {
-                throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
-            }
-
-            validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, originalPrincipal);
-
-            if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                CompletableFuture<Boolean> isProxySuperUserFuture;
-                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
-                try {
-                    AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
-                    isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId, authenticationData);
-
-                    isOriginalPrincipalSuperUserFuture =
-                            authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
-                    boolean proxyAuthorized = isProxySuperUserFuture.get()
-                            || authorizationService.isTenantAdmin(tenant, clientAppId,
-                            tenantInfo, authenticationData).get();
-                    boolean originalPrincipalAuthorized =
-                            isOriginalPrincipalSuperUserFuture.get() || authorizationService.isTenantAdmin(tenant,
-                                    originalPrincipal, tenantInfo, authenticationData).get();
-                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                String.format("Proxy not authorized to access resource (proxy:%s,original:%s)",
-                                        clientAppId, originalPrincipal));
+        return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-                }
-                log.debug("Successfully authorized {} (proxied by {}) on tenant {}",
-                          originalPrincipal, clientAppId, tenant);
-            } else {
-                if (!pulsar.getBrokerService()
-                        .getAuthorizationService()
-                        .isSuperUser(clientAppId, authenticationData)
-                        .join()) {
-                    if (!pulsar.getBrokerService().getAuthorizationService()
-                            .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                "Don't have permission to administrate resources on this tenant");
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+                        }
+                        validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
+                                originalPrincipal);
+                        if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    pulsar.getBrokerService().getAuthorizationService();
+                            return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData)
+                                .thenCompose(isTenantAdmin -> {
+                                    String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}";
+                                    if (!isTenantAdmin) {
+                                            return authorizationService.isSuperUser(clientAppId, authenticationData)
+                                                .thenCombine(authorizationService.isSuperUser(originalPrincipal,
+                                                             authenticationData),
+                                                     (proxyAuthorized, originalPrincipalAuthorized) -> {
+                                                         if (!proxyAuthorized || !originalPrincipalAuthorized) {
+                                                             throw new RestException(Status.UNAUTHORIZED,
+                                                                     String.format("Proxy not authorized to access "
+                                                                                     + "resource (proxy:%s,original:%s)"
+                                                                             , clientAppId, originalPrincipal));
+                                                         } else {
+                                                             if (log.isDebugEnabled()) {
+                                                                 log.debug(debugMsg, originalPrincipal, clientAppId,
+                                                                         tenant);
+                                                             }
+                                                             return null;
+                                                         }
+                                                     });
+                                    } else {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug(debugMsg, originalPrincipal, clientAppId, tenant);
+                                        }
+                                        return CompletableFuture.completedFuture(null);
+                                    }
+                                });
+                        } else {
+                            return pulsar.getBrokerService()
+                                    .getAuthorizationService()
+                                    .isSuperUser(clientAppId, authenticationData)
+                                    .thenCompose(isSuperUser -> {
+                                        if (!isSuperUser) {
+                                            return pulsar.getBrokerService().getAuthorizationService()
+                                                    .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData);
+                                        } else {
+                                            return CompletableFuture.completedFuture(true);
+                                        }
+                                    }).thenAccept(authorized -> {
+                                        if (!authorized) {
+                                            throw new RestException(Status.UNAUTHORIZED,
+                                                    "Don't have permission to administrate resources on this tenant");
+                                        } else {
+                                            log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
+                                        }
+                                    });
+                        }
+                    } else {
+                        return CompletableFuture.completedFuture(null);
                     }
-                }
-
-                log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
-            }
-        }
+                });
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0a8eceddf10..bd70861cd84 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -819,14 +819,16 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 namespace, topic);
         assertEquals(permission.get(role), actions);
         // remove permission
-        persistentTopics.revokePermissionsOnTopic(property, cluster, namespace, topic, role);
-
+        response = mock(AsyncResponse.class);
+        persistentTopics.revokePermissionsOnTopic(response, property, cluster, namespace, topic, role);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         // verify removed permission
         Awaitility.await().untilAsserted(() -> {
             Map<String, Set<AuthAction>> p = persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
             assertTrue(p.isEmpty());
         });
-
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index b8837e5d686..4ccfc08e9b5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -683,7 +683,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
-        persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, topicName, role);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, topicName, role);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
         Assert.assertEquals(permissions.get(role), null);
     }
@@ -702,7 +706,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
-        persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role);
+        response = mock(AsyncResponse.class);
+        persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
         Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
                 partitionedTopicName);
         Assert.assertEquals(permissions.get(role), null);