You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/28 03:03:47 UTC

[pulsar] branch bewaremypower/2.8-pick-15694 created (now 3a04eb5798a)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch bewaremypower/2.8-pick-15694
in repository https://gitbox.apache.org/repos/asf/pulsar.git


      at 3a04eb5798a [branch-2.8][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)

This branch includes the following new commits:

     new 3a04eb5798a [branch-2.8][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[pulsar] 01/01: [branch-2.8][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch bewaremypower/2.8-pick-15694
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3a04eb5798a6f73c5b425062f2677a5943f33746
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 9 09:06:28 2022 +0800

    [branch-2.8][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)
    
    (cherry picked from commit 6af365e36aed74e95ca6e088f453d9513094bb36)
    
    Besides resolving the basic conflicts, this PR
    - migrate `validateAdminAccessForTenantAsync` from #14149
    - migrate `TenantResources#getTenantAsync` from #11693
---
 .../broker/authorization/AuthorizationService.java |  28 ++++--
 .../pulsar/broker/resources/BaseResources.java     |  11 ++-
 .../pulsar/broker/resources/TenantResources.java   |   7 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 104 ++++++++++-----------
 .../pulsar/broker/lookup/TopicLookupBase.java      |  54 ++++++-----
 .../pulsar/broker/web/PulsarWebResource.java       |  97 +++++++++++++++++--
 6 files changed, 205 insertions(+), 96 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 26d04776e5d..90b49603fe8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -42,6 +41,7 @@ import javax.ws.rs.core.Response;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 
@@ -394,11 +394,15 @@ public class AuthorizationService {
                                         AuthenticationDataSource authData) {
         try {
             return allowTenantOperationAsync(
-                    tenantName, operation, originalRole, role, authData).get();
+                    tenantName, operation, originalRole, role, authData).get(
+                            conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RestException(e);
         } catch (ExecutionException e) {
             throw new RestException(e.getCause());
+        } catch (TimeoutException e) {
+            throw new RestException(e);
         }
     }
 
@@ -519,11 +523,15 @@ public class AuthorizationService {
                                                  AuthenticationDataSource authData) {
         try {
             return allowNamespacePolicyOperationAsync(
-                    namespaceName, policy, operation, originalRole, role, authData).get();
+                    namespaceName, policy, operation, originalRole, role, authData).get(
+                            conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RestException(e);
         } catch (ExecutionException e) {
             throw new RestException(e.getCause());
+        } catch (TimeoutException e) {
+            throw new RestException(e);
         }
     }
 
@@ -583,11 +591,15 @@ public class AuthorizationService {
                                              AuthenticationDataSource authData) {
         try {
             return allowTopicPolicyOperationAsync(
-                    topicName, policy, operation, originalRole, role, authData).get();
+                    topicName, policy, operation, originalRole, role, authData).get(
+                            conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RestException(e);
         } catch (ExecutionException e) {
             throw new RestException(e.getCause());
+        } catch (TimeoutException e) {
+            throw new RestException(e);
         }
     }
 
@@ -665,13 +677,17 @@ public class AuthorizationService {
                                        TopicOperation operation,
                                        String originalRole,
                                        String role,
-                                       AuthenticationDataSource authData) {
+                                       AuthenticationDataSource authData) throws Exception {
         try {
-            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
+            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
+                    conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RestException(e);
         } catch (ExecutionException e) {
             throw new RestException(e.getCause());
+        } catch (TimeoutException e) {
+            throw new RestException(e);
         }
     }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 8016bcef314..ce772c2bf04 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Joiner;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +39,8 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  */
 public class BaseResources<T> {
 
+    protected static final String BASE_POLICIES_PATH = "/admin/policies";
+
     @Getter
     private final MetadataStoreExtended store;
     @Getter
@@ -164,4 +167,10 @@ public class BaseResources<T> {
     public CompletableFuture<Boolean> existsAsync(String path) {
         return cache.exists(path);
     }
-}
\ No newline at end of file
+
+    protected static String joinPath(String... parts) {
+        StringBuilder sb = new StringBuilder();
+        Joiner.on('/').appendTo(sb, parts);
+        return sb.toString();
+    }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
index 127332e1332..78d80b9d6f7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
@@ -18,12 +18,17 @@
  */
 package org.apache.pulsar.broker.resources;
 
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 public class TenantResources extends BaseResources<TenantInfo> {
     public TenantResources(MetadataStoreExtended store, int operationTimeoutSec) {
         super(store, TenantInfo.class, operationTimeoutSec);
     }
+
+    public CompletableFuture<Optional<TenantInfo>> getTenantAsync(String tenantName) {
+        return getAsync(joinPath(BASE_POLICIES_PATH, tenantName));
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9db3c2e4096..cbcc8cd5a72 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -245,23 +245,6 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected void validateAdminAndClientPermission() {
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-        } catch (Exception ve) {
-            try {
-                checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData());
-            } catch (RestException re) {
-                throw re;
-            } catch (Exception e) {
-                // unknown error marked as internal server error
-                log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}",
-                        topicName, clientAppId(), e.getMessage(), e);
-                throw new RestException(e);
-            }
-        }
-    }
-
     public void validateAdminOperationOnTopic(boolean authoritative) {
         validateAdminAccessForTenant(topicName.getTenant());
         validateTopicOwnership(topicName, authoritative);
@@ -3446,46 +3429,55 @@ public class PersistentTopicsBase extends AdminResource {
             PulsarService pulsar, String clientAppId, String originalPrincipal,
             AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
-        try {
-            // (1) authorize client
-            try {
-                checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
-            } catch (RestException e) {
-                try {
-                    validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
-                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
-                            clientAppId, topicName.toString(), authException.getMessage()));
-                }
-            } catch (Exception ex) {
-                // throw without wrapping to PulsarClientException that considers: unknown error marked as internal
-                // server error
-                log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
-                        topicName.toString(), ex.getMessage(), ex);
-                throw ex;
-            }
+        CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
+        checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
+                .thenRun(() -> authorizationFuture.complete(null))
+                .exceptionally(e -> {
+                    Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                    if (throwable instanceof RestException) {
+                        validateAdminAccessForTenantAsync(pulsar,
+                                clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
+                                .thenRun(() -> {
+                                    authorizationFuture.complete(null);
+                                }).exceptionally(ex -> {
+                                    Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
+                                    if (throwable2 instanceof RestException) {
+                                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
+                                        authorizationFuture.completeExceptionally(new PulsarClientException(
+                                                String.format("Authorization failed %s on topic %s with error %s",
+                                                clientAppId, topicName, throwable2.getMessage())));
+                                    } else {
+                                        authorizationFuture.completeExceptionally(throwable2);
+                                    }
+                                    return null;
+                                });
+                    } else {
+                        // throw without wrapping to PulsarClientException that considers: unknown error marked as
+                        // internal server error
+                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
+                        authorizationFuture.completeExceptionally(throwable);
+                    }
+                    return null;
+                });
 
-            // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
-            // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
-            // producer/consumer
-            checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
-                    .thenCompose(res -> pulsar.getBrokerService()
-                            .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
-                    .thenAccept(metadata -> {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
-                                    metadata.partitions);
-                        }
-                        metadataFuture.complete(metadata);
-                    }).exceptionally(ex -> {
-                        metadataFuture.completeExceptionally(ex.getCause());
-                        return null;
-                    });
-        } catch (Exception ex) {
-            metadataFuture.completeExceptionally(ex);
-        }
+        // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+        // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+        // producer/consumer
+        authorizationFuture.thenCompose(__ ->
+                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+                .thenCompose(res ->
+                        pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+                .thenAccept(metadata -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
+                                metadata.partitions);
+                    }
+                    metadataFuture.complete(metadata);
+                })
+                .exceptionally(e -> {
+                    metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+                    return null;
+                });
         return metadataFuture;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 44d0ff2ec88..7769390ddcb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -217,24 +218,14 @@ public class TopicLookupBase extends PulsarWebResource {
                             cluster);
                 }
                 validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
-                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
+                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                        requestId, false));
             } else {
                 // (2) authorize client
-                try {
-                    checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
-                            authException.getMessage(), requestId));
-                    return;
-                } catch (Exception e) {
-                    log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.completeExceptionally(e);
-                    return;
-                }
-                // (3) validate global namespace
-                checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
-                        .thenAccept(peerClusterData -> {
+                checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
+                        // (3) validate global namespace
+                        checkLocalOrGetPeerReplicationCluster(pulsarService,
+                                topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
                             if (peerClusterData == null) {
                                 // (4) all validation passed: initiate lookup
                                 validationFuture.complete(null);
@@ -245,21 +236,36 @@ public class TopicLookupBase extends PulsarWebResource {
                             if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
                                     && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
                                 validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService url is not configured", requestId));
+                                        "Redirected cluster's brokerService url is not configured",
+                                        requestId));
                                 return;
                             }
                             validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
-                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
+                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                                    requestId,
                                     false));
-
                         }).exceptionally(ex -> {
-                    validationFuture.complete(
-                            newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
-                    return null;
-                });
+                            validationFuture.complete(
+                                    newLookupErrorResponse(ServerError.MetadataError,
+                                            FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
+                            return null;
+                        });
+                    })
+                    .exceptionally(e -> {
+                        Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                        if (throwable instanceof RestException) {
+                            log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
+                            validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+                                    throwable.getMessage(), requestId));
+                        } else {
+                            log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
+                            validationFuture.completeExceptionally(throwable);
+                        }
+                        return null;
+                    });
             }
         }).exceptionally(ex -> {
-            validationFuture.completeExceptionally(ex);
+            validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bc60b4ee326..61fd58ee565 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -261,6 +261,84 @@ public abstract class PulsarWebResource {
         }
     }
 
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId, String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
+        if (log.isDebugEnabled()) {
+            log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
+                    (isClientAuthenticated(clientAppId)), clientAppId);
+        }
+        return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
+                    }
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+                        }
+                        validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
+                                originalPrincipal);
+                        if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    pulsar.getBrokerService().getAuthorizationService();
+                            return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData).thenCompose(isTenantAdmin -> {
+                                String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}";
+                                if (!isTenantAdmin) {
+                                    return authorizationService.isSuperUser(clientAppId, authenticationData)
+                                            .thenCombine(authorizationService.isSuperUser(originalPrincipal,
+                                                            authenticationData),
+                                                    (proxyAuthorized, originalPrincipalAuthorized) -> {
+                                                        if (!proxyAuthorized || !originalPrincipalAuthorized) {
+                                                            throw new RestException(Status.UNAUTHORIZED,
+                                                                    String.format(
+                                                                            "Proxy not authorized to access "
+                                                                                    + "resource (proxy:%s,original:%s)"
+                                                                            , clientAppId, originalPrincipal));
+                                                        } else {
+                                                            if (log.isDebugEnabled()) {
+                                                                log.debug(debugMsg, originalPrincipal,
+                                                                        clientAppId, tenant);
+                                                            }
+                                                            return null;
+                                                        }
+                                                    });
+                                } else {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug(debugMsg, originalPrincipal, clientAppId, tenant);
+                                    }
+                                    return CompletableFuture.completedFuture(null);
+                                }
+                            });
+                        } else {
+                            return pulsar.getBrokerService()
+                                    .getAuthorizationService()
+                                    .isSuperUser(clientAppId, authenticationData)
+                                    .thenCompose(isSuperUser -> {
+                                        if (!isSuperUser) {
+                                            return pulsar.getBrokerService().getAuthorizationService()
+                                                    .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData);
+                                        } else {
+                                            return CompletableFuture.completedFuture(true);
+                                        }
+                                    }).thenAccept(authorized -> {
+                                        if (!authorized) {
+                                            throw new RestException(Status.UNAUTHORIZED,
+                                                    "Don't have permission to administrate resources on this tenant");
+                                        } else {
+                                            log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
+                                        }
+                                    });
+                        }
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
                                                        String originalPrincipal, String tenant,
                                                        AuthenticationDataSource authenticationData)
@@ -806,18 +884,21 @@ public abstract class PulsarWebResource {
         return null;
     }
 
-    protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,
-            AuthenticationDataSource authenticationData) throws Exception {
+    protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
+                        String role, AuthenticationDataSource authenticationData) {
         if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
             // No enforcing of authorization policies
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         // get zk policy manager
-        if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
-                TopicOperation.LOOKUP, null, role, authenticationData)) {
-            log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
-            throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
-        }
+        return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
+                TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
+                    if (!allow) {
+                        log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
+                        throw new RestException(Status.UNAUTHORIZED,
+                                "Don't have permission to connect to this namespace");
+                    }
+        });
     }
 
     // Used for unit tests access