You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 13:24:56 UTC

[pulsar] 03/03: [fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c88bd1b20bb57bb420b9eb4f30946afcc1d83cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 9 09:06:28 2022 +0800

    [fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)
    
    (cherry picked from commit 6af365e36aed74e95ca6e088f453d9513094bb36)
---
 .../broker/authorization/AuthorizationService.java | 20 +++--
 .../broker/admin/impl/PersistentTopicsBase.java    | 88 ++++++++++++----------
 .../pulsar/broker/lookup/TopicLookupBase.java      | 54 +++++++------
 .../pulsar/broker/web/PulsarWebResource.java       | 19 +++--
 4 files changed, 101 insertions(+), 80 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 23f650678e8..3baaf57990a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -357,10 +357,11 @@ public class AuthorizationService {
                                         TenantOperation operation,
                                         String originalRole,
                                         String role,
-                                        AuthenticationDataSource authData) {
+                                        AuthenticationDataSource authData) throws Exception {
         try {
             return allowTenantOperationAsync(
-                    tenantName, operation, originalRole, role, authData).get();
+                    tenantName, operation, originalRole, role, authData).get(
+                            conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -455,10 +456,11 @@ public class AuthorizationService {
                                                  PolicyOperation operation,
                                                  String originalRole,
                                                  String role,
-                                                 AuthenticationDataSource authData) {
+                                                 AuthenticationDataSource authData) throws Exception {
         try {
             return allowNamespacePolicyOperationAsync(
-                    namespaceName, policy, operation, originalRole, role, authData).get();
+                    namespaceName, policy, operation, originalRole, role, authData).get(
+                            conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -516,10 +518,11 @@ public class AuthorizationService {
                                              PolicyOperation operation,
                                              String originalRole,
                                              String role,
-                                             AuthenticationDataSource authData) {
+                                             AuthenticationDataSource authData) throws Exception {
         try {
             return allowTopicPolicyOperationAsync(
-                    topicName, policy, operation, originalRole, role, authData).get();
+                    topicName, policy, operation, originalRole, role, authData).get(
+                            conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -596,9 +599,10 @@ public class AuthorizationService {
                                        TopicOperation operation,
                                        String originalRole,
                                        String role,
-                                       AuthenticationDataSource authData) {
+                                       AuthenticationDataSource authData) throws Exception {
         try {
-            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
+            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
+                    conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index baab14e88e1..9c553f273ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -3998,46 +3997,55 @@ public class PersistentTopicsBase extends AdminResource {
             PulsarService pulsar, String clientAppId, String originalPrincipal,
             AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
-        try {
-            // (1) authorize client
-            try {
-                checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
-            } catch (RestException e) {
-                try {
-                    validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
-                            pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
-                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
-                            clientAppId, topicName, authException.getMessage()));
-                }
-            } catch (Exception ex) {
-                // throw without wrapping to PulsarClientException that considers: unknown error marked as internal
-                // server error
-                log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, ex);
-                throw ex;
-            }
+        CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
+        checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
+                .thenRun(() -> authorizationFuture.complete(null))
+                .exceptionally(e -> {
+                    Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                    if (throwable instanceof RestException) {
+                        validateAdminAccessForTenantAsync(pulsar,
+                                clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
+                                .thenRun(() -> {
+                                    authorizationFuture.complete(null);
+                                }).exceptionally(ex -> {
+                                    Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
+                                    if (throwable2 instanceof RestException) {
+                                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
+                                        authorizationFuture.completeExceptionally(new PulsarClientException(
+                                                String.format("Authorization failed %s on topic %s with error %s",
+                                                clientAppId, topicName, throwable2.getMessage())));
+                                    } else {
+                                        authorizationFuture.completeExceptionally(throwable2);
+                                    }
+                                    return null;
+                                });
+                    } else {
+                        // throw without wrapping to PulsarClientException that considers: unknown error marked as
+                        // internal server error
+                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
+                        authorizationFuture.completeExceptionally(throwable);
+                    }
+                    return null;
+                });
 
-            // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
-            // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
-            // producer/consumer
-            checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
-                    .thenCompose(res -> pulsar.getBrokerService()
-                            .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
-                    .thenAccept(metadata -> {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
-                                    metadata.partitions);
-                        }
-                        metadataFuture.complete(metadata);
-                    }).exceptionally(ex -> {
-                        metadataFuture.completeExceptionally(ex.getCause());
-                        return null;
-                    });
-        } catch (Exception ex) {
-            metadataFuture.completeExceptionally(ex);
-        }
+        // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+        // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+        // producer/consumer
+        authorizationFuture.thenCompose(__ ->
+                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+                .thenCompose(res ->
+                        pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+                .thenAccept(metadata -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
+                                metadata.partitions);
+                    }
+                    metadataFuture.complete(metadata);
+                })
+                .exceptionally(e -> {
+                    metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+                    return null;
+                });
         return metadataFuture;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index c8ca671f317..ae1d2a5bab0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -236,24 +237,14 @@ public class TopicLookupBase extends PulsarWebResource {
                             cluster);
                 }
                 validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
-                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
+                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                        requestId, false));
             } else {
                 // (2) authorize client
-                try {
-                    checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
-                            authException.getMessage(), requestId));
-                    return;
-                } catch (Exception e) {
-                    log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.completeExceptionally(e);
-                    return;
-                }
-                // (3) validate global namespace
-                checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
-                        .thenAccept(peerClusterData -> {
+                checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
+                        // (3) validate global namespace
+                        checkLocalOrGetPeerReplicationCluster(pulsarService,
+                                topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
                             if (peerClusterData == null) {
                                 // (4) all validation passed: initiate lookup
                                 validationFuture.complete(null);
@@ -264,21 +255,36 @@ public class TopicLookupBase extends PulsarWebResource {
                             if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
                                     && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
                                 validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService url is not configured", requestId));
+                                        "Redirected cluster's brokerService url is not configured",
+                                        requestId));
                                 return;
                             }
                             validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
-                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
+                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                                    requestId,
                                     false));
-
                         }).exceptionally(ex -> {
-                    validationFuture.complete(
-                            newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
-                    return null;
-                });
+                            validationFuture.complete(
+                                    newLookupErrorResponse(ServerError.MetadataError,
+                                            FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
+                            return null;
+                        });
+                    })
+                    .exceptionally(e -> {
+                        Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                        if (throwable instanceof RestException) {
+                            log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
+                            validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+                                    throwable.getMessage(), requestId));
+                        } else {
+                            log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
+                            validationFuture.completeExceptionally(throwable);
+                        }
+                        return null;
+                    });
             }
         }).exceptionally(ex -> {
-            validationFuture.completeExceptionally(ex);
+            validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5f7fb8fad41..d810de85bf4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -891,18 +891,21 @@ public abstract class PulsarWebResource {
         return null;
     }
 
-    protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,
-            AuthenticationDataSource authenticationData) throws Exception {
+    protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
+                        String role, AuthenticationDataSource authenticationData) {
         if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
             // No enforcing of authorization policies
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         // get zk policy manager
-        if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
-                TopicOperation.LOOKUP, null, role, authenticationData)) {
-            log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
-            throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
-        }
+        return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
+                TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
+                    if (!allow) {
+                        log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
+                        throw new RestException(Status.UNAUTHORIZED,
+                                "Don't have permission to connect to this namespace");
+                    }
+        });
     }
 
     // Used for unit tests access