You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 13:24:56 UTC
[pulsar] 03/03: [fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c88bd1b20bb57bb420b9eb4f30946afcc1d83cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 9 09:06:28 2022 +0800
[fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)
(cherry picked from commit 6af365e36aed74e95ca6e088f453d9513094bb36)
---
.../broker/authorization/AuthorizationService.java | 20 +++--
.../broker/admin/impl/PersistentTopicsBase.java | 88 ++++++++++++----------
.../pulsar/broker/lookup/TopicLookupBase.java | 54 +++++++------
.../pulsar/broker/web/PulsarWebResource.java | 19 +++--
4 files changed, 101 insertions(+), 80 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 23f650678e8..3baaf57990a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -357,10 +357,11 @@ public class AuthorizationService {
TenantOperation operation,
String originalRole,
String role,
- AuthenticationDataSource authData) {
+ AuthenticationDataSource authData) throws Exception {
try {
return allowTenantOperationAsync(
- tenantName, operation, originalRole, role, authData).get();
+ tenantName, operation, originalRole, role, authData).get(
+ conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
@@ -455,10 +456,11 @@ public class AuthorizationService {
PolicyOperation operation,
String originalRole,
String role,
- AuthenticationDataSource authData) {
+ AuthenticationDataSource authData) throws Exception {
try {
return allowNamespacePolicyOperationAsync(
- namespaceName, policy, operation, originalRole, role, authData).get();
+ namespaceName, policy, operation, originalRole, role, authData).get(
+ conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
@@ -516,10 +518,11 @@ public class AuthorizationService {
PolicyOperation operation,
String originalRole,
String role,
- AuthenticationDataSource authData) {
+ AuthenticationDataSource authData) throws Exception {
try {
return allowTopicPolicyOperationAsync(
- topicName, policy, operation, originalRole, role, authData).get();
+ topicName, policy, operation, originalRole, role, authData).get(
+ conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
@@ -596,9 +599,10 @@ public class AuthorizationService {
TopicOperation operation,
String originalRole,
String role,
- AuthenticationDataSource authData) {
+ AuthenticationDataSource authData) throws Exception {
try {
- return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
+ return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
+ conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index baab14e88e1..9c553f273ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -3998,46 +3997,55 @@ public class PersistentTopicsBase extends AdminResource {
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
- try {
- // (1) authorize client
- try {
- checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
- } catch (RestException e) {
- try {
- validateAdminAccessForTenant(pulsar,
- clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
- pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
- } catch (RestException authException) {
- log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
- throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
- clientAppId, topicName, authException.getMessage()));
- }
- } catch (Exception ex) {
- // throw without wrapping to PulsarClientException that considers: unknown error marked as internal
- // server error
- log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, ex);
- throw ex;
- }
+ CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
+ checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
+ .thenRun(() -> authorizationFuture.complete(null))
+ .exceptionally(e -> {
+ Throwable throwable = FutureUtil.unwrapCompletionException(e);
+ if (throwable instanceof RestException) {
+ validateAdminAccessForTenantAsync(pulsar,
+ clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
+ .thenRun(() -> {
+ authorizationFuture.complete(null);
+ }).exceptionally(ex -> {
+ Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
+ if (throwable2 instanceof RestException) {
+ log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
+ authorizationFuture.completeExceptionally(new PulsarClientException(
+ String.format("Authorization failed %s on topic %s with error %s",
+ clientAppId, topicName, throwable2.getMessage())));
+ } else {
+ authorizationFuture.completeExceptionally(throwable2);
+ }
+ return null;
+ });
+ } else {
+ // throw without wrapping to PulsarClientException that considers: unknown error marked as
+ // internal server error
+ log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
+ authorizationFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
- // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
- // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
- // producer/consumer
- checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
- .thenCompose(res -> pulsar.getBrokerService()
- .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
- .thenAccept(metadata -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
- metadata.partitions);
- }
- metadataFuture.complete(metadata);
- }).exceptionally(ex -> {
- metadataFuture.completeExceptionally(ex.getCause());
- return null;
- });
- } catch (Exception ex) {
- metadataFuture.completeExceptionally(ex);
- }
+ // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+ // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+ // producer/consumer
+ authorizationFuture.thenCompose(__ ->
+ checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+ .thenCompose(res ->
+ pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+ .thenAccept(metadata -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
+ metadata.partitions);
+ }
+ metadataFuture.complete(metadata);
+ })
+ .exceptionally(e -> {
+ metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+ return null;
+ });
return metadataFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index c8ca671f317..ae1d2a5bab0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -236,24 +237,14 @@ public class TopicLookupBase extends PulsarWebResource {
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
- differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
+ differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+ requestId, false));
} else {
// (2) authorize client
- try {
- checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
- } catch (RestException authException) {
- log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
- validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
- authException.getMessage(), requestId));
- return;
- } catch (Exception e) {
- log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
- validationFuture.completeExceptionally(e);
- return;
- }
- // (3) validate global namespace
- checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
- .thenAccept(peerClusterData -> {
+ checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
+ // (3) validate global namespace
+ checkLocalOrGetPeerReplicationCluster(pulsarService,
+ topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
@@ -264,21 +255,36 @@ public class TopicLookupBase extends PulsarWebResource {
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
- "Redirected cluster's brokerService url is not configured", requestId));
+ "Redirected cluster's brokerService url is not configured",
+ requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
- peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
+ peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+ requestId,
false));
-
}).exceptionally(ex -> {
- validationFuture.complete(
- newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
- return null;
- });
+ validationFuture.complete(
+ newLookupErrorResponse(ServerError.MetadataError,
+ FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
+ return null;
+ });
+ })
+ .exceptionally(e -> {
+ Throwable throwable = FutureUtil.unwrapCompletionException(e);
+ if (throwable instanceof RestException) {
+ log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
+ validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+ throwable.getMessage(), requestId));
+ } else {
+ log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
+ validationFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
}
}).exceptionally(ex -> {
- validationFuture.completeExceptionally(ex);
+ validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5f7fb8fad41..d810de85bf4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -891,18 +891,21 @@ public abstract class PulsarWebResource {
return null;
}
- protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,
- AuthenticationDataSource authenticationData) throws Exception {
+ protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
+ String role, AuthenticationDataSource authenticationData) {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
- return;
+ return CompletableFuture.completedFuture(null);
}
// get zk policy manager
- if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
- TopicOperation.LOOKUP, null, role, authenticationData)) {
- log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
- throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
- }
+ return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
+ if (!allow) {
+ log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
+ throw new RestException(Status.UNAUTHORIZED,
+ "Don't have permission to connect to this namespace");
+ }
+ });
}
// Used for unit tests access