You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/06/17 05:27:44 UTC

[pulsar] branch branch-2.9 updated: [branch-2.9][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#16083)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 1fa9c2e1797 [branch-2.9][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#16083)
1fa9c2e1797 is described below

commit 1fa9c2e17977cb451c0379581c35c70920a393f3
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Fri Jun 17 13:27:36 2022 +0800

    [branch-2.9][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#16083)
---
 .../broker/authorization/AuthorizationService.java |  24 +++--
 .../broker/admin/impl/PersistentTopicsBase.java    |  89 +++++++++---------
 .../pulsar/broker/lookup/TopicLookupBase.java      |  53 ++++++-----
 .../pulsar/broker/web/PulsarWebResource.java       | 100 +++++++++++++++++++--
 4 files changed, 190 insertions(+), 76 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 6943e95999b..ae36a9bba6e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -43,6 +43,7 @@ import javax.ws.rs.core.Response;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 
@@ -396,11 +397,15 @@ public class AuthorizationService {
                                         AuthenticationDataSource authData) {
         try {
             return allowTenantOperationAsync(
-                    tenantName, operation, originalRole, role, authData).get();
+                    tenantName, operation, originalRole, role, authData).get(
+                    conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RestException(e);
         } catch (ExecutionException e) {
             throw new RestException(e.getCause());
+        } catch (TimeoutException e) {
+            throw new RestException(e);
         }
     }
 
@@ -521,11 +526,15 @@ public class AuthorizationService {
                                                  AuthenticationDataSource authData) {
         try {
             return allowNamespacePolicyOperationAsync(
-                    namespaceName, policy, operation, originalRole, role, authData).get();
+                    namespaceName, policy, operation, originalRole, role, authData).get(
+                    conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RestException(e);
         } catch (ExecutionException e) {
             throw new RestException(e.getCause());
+        } catch (TimeoutException e) {
+            throw new RestException(e);
         }
     }
 
@@ -585,11 +594,15 @@ public class AuthorizationService {
                                              AuthenticationDataSource authData) {
         try {
             return allowTopicPolicyOperationAsync(
-                    topicName, policy, operation, originalRole, role, authData).get();
+                    topicName, policy, operation, originalRole, role, authData).get(
+                    conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RestException(e);
         } catch (ExecutionException e) {
             throw new RestException(e.getCause());
+        } catch (TimeoutException e) {
+            throw new RestException(e);
         }
     }
 
@@ -667,9 +680,10 @@ public class AuthorizationService {
                                        TopicOperation operation,
                                        String originalRole,
                                        String role,
-                                       AuthenticationDataSource authData) {
+                                       AuthenticationDataSource authData) throws Exception {
         try {
-            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
+            return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
+                    conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b28a51014ad..4ff236af5ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -233,7 +233,7 @@ public class PersistentTopicsBase extends AdminResource {
             validateAdminAccessForTenant(topicName.getTenant());
         } catch (Exception ve) {
             try {
-                checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData());
+                checkAuthorizationAsync(pulsar(), topicName, clientAppId(), clientAuthData());
             } catch (RestException re) {
                 throw re;
             } catch (Exception e) {
@@ -3559,46 +3559,55 @@ public class PersistentTopicsBase extends AdminResource {
             PulsarService pulsar, String clientAppId, String originalPrincipal,
             AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
-        try {
-            // (1) authorize client
-            try {
-                checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
-            } catch (RestException e) {
-                try {
-                    validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
-                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
-                            clientAppId, topicName.toString(), authException.getMessage()));
-                }
-            } catch (Exception ex) {
-                // throw without wrapping to PulsarClientException that considers: unknown error marked as internal
-                // server error
-                log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
-                        topicName.toString(), ex.getMessage(), ex);
-                throw ex;
-            }
+        CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
+        checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
+                .thenRun(() -> authorizationFuture.complete(null))
+                .exceptionally(e -> {
+                    Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                    if (throwable instanceof RestException) {
+                        validateAdminAccessForTenantAsync(pulsar,
+                                clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
+                                .thenRun(() -> {
+                                    authorizationFuture.complete(null);
+                                }).exceptionally(ex -> {
+                                    Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
+                                    if (throwable2 instanceof RestException) {
+                                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
+                                        authorizationFuture.completeExceptionally(new PulsarClientException(
+                                                String.format("Authorization failed %s on topic %s with error %s",
+                                                        clientAppId, topicName, throwable2.getMessage())));
+                                    } else {
+                                        authorizationFuture.completeExceptionally(throwable2);
+                                    }
+                                    return null;
+                                });
+                    } else {
+                        // throw without wrapping to PulsarClientException that considers: unknown error marked as
+                        // internal server error
+                        log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
+                        authorizationFuture.completeExceptionally(throwable);
+                    }
+                    return null;
+                });
 
-            // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
-            // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
-            // producer/consumer
-            checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
-                    .thenCompose(res -> pulsar.getBrokerService()
-                            .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
-                    .thenAccept(metadata -> {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
-                                    metadata.partitions);
-                        }
-                        metadataFuture.complete(metadata);
-                    }).exceptionally(ex -> {
-                        metadataFuture.completeExceptionally(ex.getCause());
-                        return null;
-                    });
-        } catch (Exception ex) {
-            metadataFuture.completeExceptionally(ex);
-        }
+        // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+        // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+        // producer/consumer
+        authorizationFuture.thenCompose(__ ->
+                        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+                .thenCompose(res ->
+                        pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+                .thenAccept(metadata -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
+                                metadata.partitions);
+                    }
+                    metadataFuture.complete(metadata);
+                })
+                .exceptionally(e -> {
+                    metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+                    return null;
+                });
         return metadataFuture;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index dab1b293e90..967059c0718 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -219,23 +220,14 @@ public class TopicLookupBase extends PulsarWebResource {
                             cluster);
                 }
                 validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
-                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
+                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                        requestId, false));
             } else {
                 // (2) authorize client
-                try {
-                    checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
-                            authException.getMessage(), requestId));
-                    return;
-                } catch (Exception e) {
-                    log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.completeExceptionally(e);
-                    return;
-                }
-                // (3) validate global namespace
-                checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
+                checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
+                    // (3) validate global namespace
+                    checkLocalOrGetPeerReplicationCluster(pulsarService,
+                            topicName.getNamespaceObject())
                         .thenAccept(peerClusterData -> {
                             if (peerClusterData == null) {
                                 // (4) all validation passed: initiate lookup
@@ -247,21 +239,36 @@ public class TopicLookupBase extends PulsarWebResource {
                             if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
                                     && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
                                 validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService url is not configured", requestId));
+                                        "Redirected cluster's brokerService url is not configured",
+                                        requestId));
                                 return;
                             }
                             validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
-                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
+                                    peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+                                    requestId,
                                     false));
-
                         }).exceptionally(ex -> {
-                    validationFuture.complete(
-                            newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
-                    return null;
-                });
+                                validationFuture.complete(
+                                        newLookupErrorResponse(ServerError.MetadataError,
+                                                FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
+                                return null;
+                            });
+                        })
+                        .exceptionally(e -> {
+                            Throwable throwable = FutureUtil.unwrapCompletionException(e);
+                            if (throwable instanceof RestException) {
+                                log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
+                                validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+                                        throwable.getMessage(), requestId));
+                            } else {
+                                log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
+                                validationFuture.completeExceptionally(throwable);
+                            }
+                            return null;
+                        });
             }
         }).exceptionally(ex -> {
-            validationFuture.completeExceptionally(ex);
+            validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 7b5f455ac97..20652d14a53 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -245,6 +245,86 @@ public abstract class PulsarWebResource {
         }
     }
 
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
+        if (log.isDebugEnabled()) {
+            log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
+                    (isClientAuthenticated(clientAppId)), clientAppId);
+        }
+        return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
+                    }
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+                        }
+                        validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
+                                originalPrincipal);
+                        if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    pulsar.getBrokerService().getAuthorizationService();
+                            return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData)
+                                    .thenCompose(isTenantAdmin -> {
+                                        String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}";
+                                        if (!isTenantAdmin) {
+                                            return authorizationService.isSuperUser(clientAppId, authenticationData)
+                                                    .thenCombine(authorizationService.isSuperUser(originalPrincipal,
+                                                                    authenticationData),
+                                                            (proxyAuthorized, originalPrincipalAuthorized) -> {
+                                                                if (!proxyAuthorized || !originalPrincipalAuthorized) {
+                                                                    throw new RestException(Status.UNAUTHORIZED,
+                                                                            String.format(
+                                                                                    "Proxy not authorized to access "
+                                                                                    + "resource (proxy:%s,original:%s)"
+                                                                                    , clientAppId, originalPrincipal));
+                                                                } else {
+                                                                    if (log.isDebugEnabled()) {
+                                                                        log.debug(debugMsg, originalPrincipal,
+                                                                                clientAppId, tenant);
+                                                                    }
+                                                                    return null;
+                                                                }
+                                                            });
+                                        } else {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug(debugMsg, originalPrincipal, clientAppId, tenant);
+                                            }
+                                            return CompletableFuture.completedFuture(null);
+                                        }
+                                    });
+                        } else {
+                            return pulsar.getBrokerService()
+                                    .getAuthorizationService()
+                                    .isSuperUser(clientAppId, authenticationData)
+                                    .thenCompose(isSuperUser -> {
+                                        if (!isSuperUser) {
+                                            return pulsar.getBrokerService().getAuthorizationService()
+                                                    .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData);
+                                        } else {
+                                            return CompletableFuture.completedFuture(true);
+                                        }
+                                    }).thenAccept(authorized -> {
+                                        if (!authorized) {
+                                            throw new RestException(Status.UNAUTHORIZED,
+                                                    "Don't have permission to administrate resources on this tenant");
+                                        } else {
+                                            log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
+                                        }
+                                    });
+                        }
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
                                                        String originalPrincipal, String tenant,
                                                        AuthenticationDataSource authenticationData)
@@ -795,18 +875,22 @@ public abstract class PulsarWebResource {
         return null;
     }
 
-    protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,
-            AuthenticationDataSource authenticationData) throws Exception {
+    protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService,
+                                                                     TopicName topicName, String role,
+                                                                     AuthenticationDataSource authenticationData) {
         if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
             // No enforcing of authorization policies
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         // get zk policy manager
-        if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
-                TopicOperation.LOOKUP, null, role, authenticationData)) {
-            log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
-            throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
-        }
+        return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
+                TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
+            if (!allow) {
+                log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
+                throw new RestException(Status.UNAUTHORIZED,
+                        "Don't have permission to connect to this namespace");
+            }
+        });
     }
 
     // Used for unit tests access