You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/05/18 23:05:32 UTC
[pulsar] branch master updated: [Issue 5720][authorization
provider] Add granularity (#6428)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 029b6f4 [Issue 5720][authorization provider] Add granularity (#6428)
029b6f4 is described below
commit 029b6f4cae5b014d51c9894068590cf3904c5a23
Author: Alexandre DUVAL <ka...@gmail.com>
AuthorDate: Tue May 19 01:05:21 2020 +0200
[Issue 5720][authorization provider] Add granularity (#6428)
Fixes #5720
### Motivation
Provide "real" authz abilities to pulsar resources.
### Modifications
Add stuff to `AuthorizationProvider` interface, and use them on every pulsar resource management auth (tenant, namespace, topics, functions, connectors, ...)
---
.../authentication/AuthenticationDataCommand.java | 32 +++-
.../authentication/AuthenticationDataSource.java | 23 +++
.../authorization/AuthorizationProvider.java | 118 ++++++++++++++
.../broker/authorization/AuthorizationService.java | 173 +++++++++++++++++++++
.../authorization/PulsarAuthorizationProvider.java | 125 ++++++++++++++-
.../pulsar/broker/admin/impl/NamespacesBase.java | 153 +++++++++---------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 25 +--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 28 ++--
.../apache/pulsar/broker/service/ServerCnx.java | 33 ++--
.../pulsar/broker/web/PulsarWebResource.java | 70 +++++++++
.../apache/pulsar/broker/admin/NamespacesTest.java | 20 ++-
.../pulsar/broker/service/ServerCnxTest.java | 16 +-
.../api/AuthorizationProducerConsumerTest.java | 65 +++++++-
.../service/web/DiscoveryServiceWebTest.java | 8 +-
.../common/policies/data/NamespaceOperation.java | 41 +++++
.../pulsar/common/policies/data/PolicyName.java | 45 ++++++
.../common/policies/data/PolicyOperation.java | 28 ++++
.../common/policies/data/TenantOperation.java | 29 ++++
.../common/policies/data/TopicOperation.java | 50 ++++++
19 files changed, 941 insertions(+), 141 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
index 6539fb3..7299eae 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
@@ -28,9 +28,22 @@ public class AuthenticationDataCommand implements AuthenticationDataSource {
protected final String authData;
protected final SocketAddress remoteAddress;
protected final SSLSession sslSession;
+ protected String subscription;
public AuthenticationDataCommand(String authData) {
- this(authData, null, null);
+ this(authData, null, null, null);
+ }
+
+ public AuthenticationDataCommand(String authData, String subscription) {
+ this(authData, null, null, subscription);
+ }
+
+ public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession,
+ String subscription) {
+ this.authData = authData;
+ this.remoteAddress = remoteAddress;
+ this.sslSession = sslSession;
+ this.subscription = subscription;
}
public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession) {
@@ -85,4 +98,21 @@ public class AuthenticationDataCommand implements AuthenticationDataSource {
}
}
+ /*
+ * Subscription
+ */
+ @Override
+ public boolean hasSubscription() {
+ return this.subscription != null;
+ }
+
+ @Override
+ public void setSubscription(String subscription) {
+ this.subscription = subscription;
+ }
+
+ @Override
+ public String getSubscription() {
+ return subscription;
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
index b72b99b..eb9ed2b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
@@ -127,4 +127,27 @@ public interface AuthenticationDataSource {
default SocketAddress getPeerAddress() {
return null;
}
+
+ /**
+ * Check if subscription is defined available.
+ *
+ * @return true if this authentication data contain subscription
+ */
+ default boolean hasSubscription() {
+ return false;
+ }
+
+ /**
+ * Subscription name can be necessary for consumption
+ *
+ * @return a <code>String</code> containing the subscription name
+ */
+ default String getSubscription() { return null; }
+
+ /**
+ * Subscription name can be necessary for consumption
+ *
+ * @return a <code>String</code> containing the subscription name
+ */
+ default void setSubscription(String subscription) { };
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 572e403..c4fcc5e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -29,7 +30,14 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
/**
* Provider of authorization mechanism
@@ -186,4 +194,114 @@ public interface AuthorizationProvider extends Closeable {
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);
+ /**
+ * Grant authorization-action permission on a tenant to the given client
+ * @param tenantName
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture<Boolean>
+ */
+ default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" +
+ " provider you are using.",
+ operation.toString(), tenantName)));
+ }
+
+ default Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTenantOperationAsync(tenantName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ * @param namespaceName
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture<Boolean>
+ */
+ default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
+ String role, NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("NamespaceOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role,
+ NamespaceOperation operation, AuthenticationDataSource authData) {
+ try {
+ return allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ * @param namespaceName
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture<Boolean>
+ */
+ default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalRole,
+ String role, AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("NamespacePolicyOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation,
+ String originalRole, String role, AuthenticationDataSource authData) {
+ try {
+ return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+
+ /**
+ * Grant authorization-action permission on a topic to the given client
+ * @param topic
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture<Boolean>
+ */
+ default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("TopicOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 3bf4458..e92ab84 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -21,12 +21,19 @@ package org.apache.pulsar.broker.authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -318,4 +325,170 @@ public class AuthorizationService {
AuthenticationDataSource authenticationData) {
return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData);
}
+
+ /**
+ * Grant authorization-action permission on a tenant to the given client
+ *
+ * @param tenantName
+ * @param operation
+ * @param originalRole
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when tenant not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, TenantOperation operation,
+ String originalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowTenantOperationAsync(tenantName, originalRole, role, operation, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowTenantOperationAsync"));
+ }
+
+ public Boolean allowTenantOperation(String tenantName, TenantOperation operation, String orignalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowTenantOperation(tenantName, orignalRole, role, operation, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowTenantOperation");
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ *
+ * @param namespaceName
+ * @param operation
+ * @param originalRole
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+ NamespaceOperation operation,
+ String originalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowNamespaceOperationAsync"));
+ }
+
+ public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation,
+ String originalPrincipal, String role, AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowNamespaceOperation(namespaceName, originalPrincipal, role, operation, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowNamespaceOperation");
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ *
+ * @param namespaceName
+ * @param operation
+ * @param originalRole
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalRole,
+ String role, AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowNamespacePolicyOperationAsync"));
+ }
+
+ public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalPrincipal, String role,
+ AuthenticationDataHttps authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal, role, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowNamespacePolicyOperation");
+ }
+
+ /**
+ * Grant authorization-action permission on a topic to the given client
+ *
+ * @param topicName
+ * @param operation
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, TopicOperation operation,
+ String originalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowTopicOperationAsync(topicName, originalRole, role, operation, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowTopicOperationAsync"));
+ }
+
+ public Boolean allowTopicOperation(TopicName topicName, TopicOperation operation,
+ String orignalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowTopicOperation(topicName, orignalRole, role, operation, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowTopicOperation");
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index b025f80..40b2021 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -27,8 +27,9 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -36,8 +37,17 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
+
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -46,7 +56,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
+import javax.ws.rs.core.Response;
/**
* Default authorization provider that stores authorization policies under local-zookeeper.
@@ -501,4 +511,115 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
}
}
+ @Override
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(tenantName, originalRole, role, authData);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
+ String role, NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalRole,
+ String role, AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, String originalRole, String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ CompletableFuture<Boolean> isAuthorizedFuture;
+
+ switch (operation) {
+ case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, authData);
+ break;
+ case PRODUCE: isAuthorizedFuture= canProduceAsync(topicName, role, authData);
+ break;
+ case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
+ break;
+ default: isAuthorizedFuture = FutureUtil.failedFuture(
+ new IllegalStateException("TopicOperation is not supported."));
+ }
+
+ CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, conf);
+
+ return isSuperUserFuture
+ .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> isSuperUser || isAuthorized);
+ }
+
+ private static String path(String... parts) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("/admin/");
+ Joiner.on('/').appendTo(sb, parts);
+ return sb.toString();
+ }
+
+ private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String originalRole, String role,
+ AuthenticationDataSource authData) {
+ try {
+ TenantInfo tenantInfo = configCache.propertiesCache()
+ .get(path(POLICIES, tenantName))
+ .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
+
+ validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+
+ if (role != null && conf.getProxyRoles().contains(role)) {
+ // role check
+ CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, conf);
+ CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+ CompletableFuture<Boolean> isRoleAuthorizedFuture = isRoleSuperUserFuture
+ .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+ isRoleSuperUser || isRoleTenantAdmin);
+
+ // originalRole check
+ CompletableFuture<Boolean> isOriginalRoleSuperUserFuture = isSuperUser(originalRole, conf);
+ CompletableFuture<Boolean> isOriginalRoleTenantAdminFuture = isTenantAdmin(tenantName, originalRole,
+ tenantInfo, authData);
+ CompletableFuture<Boolean> isOriginalRoleAuthorizedFuture = isOriginalRoleSuperUserFuture
+ .thenCombine(isOriginalRoleTenantAdminFuture, (isOriginalRoleSuperUser, isOriginalRoleTenantAdmin) ->
+ isOriginalRoleSuperUser || isOriginalRoleTenantAdmin);
+
+ // merging
+ return isRoleAuthorizedFuture
+ .thenCombine(isOriginalRoleAuthorizedFuture, (isRoleAuthorized, isOriginalRoleAuthorized) ->
+ isRoleAuthorized && isOriginalRoleAuthorized);
+ } else {
+ // role check
+ CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, conf);
+ CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+ return isRoleSuperUserFuture
+ .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+ isRoleSuperUser || isRoleTenantAdmin);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
+ throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+ } catch (Exception e) {
+ log.error("Failed to get tenant {}", tenantName, e);
+ throw new RestException(e);
+ }
+ }
+
+ private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
+ String originalPrincipal) {
+ if (proxyRoles.contains(authenticatedPrincipal)) {
+ // Request has come from a proxy
+ if (StringUtils.isBlank(originalPrincipal)) {
+ log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
+ throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
+ }
+ if (proxyRoles.contains(originalPrincipal)) {
+ log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
+ throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+ }
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b0504ee..8a04adf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -80,15 +80,19 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
@@ -103,7 +107,7 @@ public abstract class NamespacesBase extends AdminResource {
private static final long MAX_BUNDLES = ((long) 1) << 32;
protected List<String> internalGetTenantNamespaces(String tenant) {
- validateAdminAccessForTenant(tenant);
+ validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
try {
return getListOfNamespaces(tenant);
@@ -117,9 +121,8 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalCreateNamespace(Policies policies) {
+ validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
validatePoliciesReadOnlyAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
-
validatePolicies(namespaceName, policies);
try {
@@ -138,7 +141,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
validatePoliciesReadOnlyAccess();
// ensure that non-global namespace is directed to the correct cluster
@@ -282,7 +285,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
validatePoliciesReadOnlyAccess();
// ensure that non-global namespace is directed to the correct cluster
@@ -353,7 +356,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -384,7 +387,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -414,7 +417,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalRevokePermissionsOnNamespace(String role) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
try {
@@ -444,7 +447,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -457,6 +460,8 @@ public abstract class NamespacesBase extends AdminResource {
}
protected Set<String> internalGetNamespaceReplicationClusters() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ);
+
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the replication clusters for a non-global namespace");
@@ -467,7 +472,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
@@ -525,7 +530,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetNamespaceMessageTTL(int messageTTL) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (messageTTL < 0) {
@@ -820,7 +825,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalModifyDeduplication(boolean enableDeduplication) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Entry<Policies, Stat> policiesNode = null;
@@ -856,9 +861,8 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalUnloadNamespace(AsyncResponse asyncResponse) {
- log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
-
validateSuperUserAccess();
+ log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
@@ -903,11 +907,10 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffinityGroup) {
+ validateSuperUserAccess();
log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup,
this.namespaceName);
- validateSuperUserAccess();
-
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
@@ -994,9 +997,9 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
+ validateSuperUserAccess();
log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
- validateSuperUserAccess();
Policies policies = getNamespacePolicies(namespaceName);
NamespaceBundle bundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName.toString(), bundleRange);
@@ -1042,9 +1045,9 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
+ validateSuperUserAccess();
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
- validateSuperUserAccess();
Policies policies = getNamespacePolicies(namespaceName);
if (namespaceName.isGlobal()) {
@@ -1095,8 +1098,8 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
- log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
Entry<Policies, Stat> policiesNode = null;
@@ -1132,7 +1135,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected PublishRate internalGetPublishRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
PublishRate publishRate = policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
@@ -1146,8 +1149,8 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
- log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
Entry<Policies, Stat> policiesNode = null;
@@ -1185,7 +1188,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected DispatchRate internalGetTopicDispatchRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1201,8 +1204,8 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
- log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
Entry<Policies, Stat> policiesNode = null;
@@ -1238,7 +1241,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected DispatchRate internalGetSubscriptionDispatchRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1251,9 +1254,10 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
- log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
+
Entry<Policies, Stat> policiesNode = null;
try {
@@ -1288,7 +1292,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected SubscribeRate internalGetSubscribeRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
SubscribeRate subscribeRate = policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
if (subscribeRate != null) {
@@ -1300,8 +1304,8 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
- log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
Entry<Policies, Stat> policiesNode = null;
@@ -1337,7 +1341,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected DispatchRate internalGetReplicatorDispatchRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1350,7 +1354,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (backlogQuotaType == null) {
@@ -1397,7 +1401,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (backlogQuotaType == null) {
@@ -1430,7 +1434,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetRetention(RetentionPolicies retention) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -1468,7 +1472,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetPersistence(PersistencePolicies persistence) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
validatePersistencePolicies(persistence);
@@ -1499,7 +1503,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected PersistencePolicies internalGetPersistence() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.persistence == null) {
@@ -1511,7 +1515,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
@@ -1553,7 +1557,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1574,7 +1578,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
@@ -1617,7 +1621,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1638,7 +1642,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
@@ -1680,7 +1684,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1700,7 +1704,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscriptionAuthMode) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (subscriptionAuthMode == null) {
@@ -1736,7 +1740,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Entry<Policies, Stat> policiesNode = null;
@@ -1772,7 +1776,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.delayed_delivery_policies == null) {
@@ -1785,7 +1789,6 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
validateSuperUserAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
try {
@@ -1818,7 +1821,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);
@@ -1859,12 +1862,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected String internalGetNamespaceAntiAffinityGroup() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).antiAffinityGroup;
}
protected void internalRemoveNamespaceAntiAffinityGroup() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName);
@@ -1895,7 +1898,7 @@ public abstract class NamespacesBase extends AdminResource {
protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
String tenant) {
- validateAdminAccessForTenant(tenant);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);
@@ -1947,7 +1950,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected RetentionPolicies internalGetRetention() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.retention_policies == null) {
@@ -2140,12 +2143,12 @@ public abstract class NamespacesBase extends AdminResource {
protected int internalGetMaxProducersPerTopic() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_producers_per_topic;
}
protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2181,12 +2184,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected int internalGetMaxConsumersPerTopic() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_topic;
}
protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2222,12 +2225,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected int internalGetMaxConsumersPerSubscription() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
}
protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2263,12 +2266,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected int internalGetMaxUnackedMessagesPerConsumer() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
}
protected void internalSetMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2304,12 +2307,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected int internalGetMaxUnackedMessagesPerSubscription() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
}
protected void internalSetMaxUnackedMessagesPerSubscription(int maxUnackedMessagesPerSubscription) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2345,12 +2348,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected long internalGetCompactionThreshold() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).compaction_threshold;
}
protected void internalSetCompactionThreshold(long newThreshold) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2386,7 +2389,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected long internalGetOffloadThreshold() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.offload_policies == null) {
return policies.offload_threshold;
@@ -2396,7 +2399,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetOffloadThreshold(long newThreshold) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2433,7 +2436,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected Long internalGetOffloadDeletionLag() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.offload_policies == null) {
return policies.offload_deletion_lag_ms;
@@ -2443,7 +2446,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2481,12 +2484,12 @@ public abstract class NamespacesBase extends AdminResource {
@Deprecated
protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
}
protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED){
@@ -2498,7 +2501,7 @@ public abstract class NamespacesBase extends AdminResource {
@Deprecated
protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2509,7 +2512,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2520,13 +2523,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected boolean internalGetSchemaValidationEnforced() {
- validateSuperUserAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_validation_enforced;
}
protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2537,13 +2539,12 @@ public abstract class NamespacesBase extends AdminResource {
}
protected boolean internalGetIsAllowAutoUpdateSchema() {
- validateSuperUserAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).is_allow_auto_update_schema;
}
protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2586,7 +2587,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPolicies offloadPolicies) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
validateOffloadPolicies(offloadPolicies);
@@ -2663,7 +2664,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected OffloadPolicies internalGetOffloadPolicies() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.offload_policies;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 700381d..c9e6def 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -35,12 +36,16 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +94,7 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Property or cluster doesn't exist") })
public List<String> getNamespacesForCluster(@PathParam("property") String property,
@PathParam("cluster") String cluster) {
- validateAdminAccessForTenant(property);
+ validateTenantOperation(property, TenantOperation.LIST_NAMESPACES);
List<String> namespaces = Lists.newArrayList();
if (!clusters().contains(cluster)) {
log.warn("[{}] Failed to get namespace list for property: {}/{} - Cluster does not exist", clientAppId(),
@@ -121,16 +126,14 @@ public class Namespaces extends NamespacesBase {
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
- .thenAccept(topics -> {
- asyncResponse.resume(topics);
- })
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
asyncResponse.resume(ex);
@@ -145,8 +148,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public Policies getPolicies(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.ALL, PolicyOperation.READ);
return getNamespacePolicies(namespaceName);
}
@@ -228,8 +231,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 409, message = "Namespace is not empty") })
public Map<String, Set<AuthAction>> getPermissions(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION);
Policies policies = getNamespacePolicies(namespaceName);
return policies.auth_policies.namespace_auth;
@@ -294,8 +297,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 412, message = "Namespace is not global") })
public Set<String> getNamespaceReplicationClusters(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.REPLICATION, PolicyOperation.READ);
return internalGetNamespaceReplicationClusters();
}
@@ -320,8 +323,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public int getNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.TTL, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.message_ttl_in_seconds;
@@ -505,9 +508,9 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
public BundlesData getBundlesData(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validatePoliciesReadOnlyAccess();
validateNamespaceName(property, cluster, namespace);
+ validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_BUNDLE);
Policies policies = getNamespacePolicies(namespaceName);
@@ -638,8 +641,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.BACKLOG, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.backlog_quota_map;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 95cf65b..4f123a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -53,9 +54,12 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -64,6 +68,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.TenantOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,16 +96,14 @@ public class Namespaces extends NamespacesBase {
@PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
- .thenAccept(topics -> {
- asyncResponse.resume(topics);
- })
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
asyncResponse.resume(ex);
@@ -114,8 +117,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public Policies getPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.ALL, PolicyOperation.READ);
return getNamespacePolicies(namespaceName);
}
@@ -129,7 +132,7 @@ public class Namespaces extends NamespacesBase {
public void createNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
Policies policies) {
validateNamespaceName(tenant, namespace);
-
+ validateTenantOperation(tenant, TenantOperation.CREATE_NAMESPACE);
policies = getDefaultPolicesIfNull(policies);
internalCreateNamespace(policies);
}
@@ -178,8 +181,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 409, message = "Namespace is not empty") })
public Map<String, Set<AuthAction>> getPermissions(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION);
Policies policies = getNamespacePolicies(namespaceName);
return policies.auth_policies.namespace_auth;
@@ -244,9 +247,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 412, message = "Namespace is not global") })
public Set<String> getNamespaceReplicationClusters(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
-
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.REPLICATION, PolicyOperation.READ);
return internalGetNamespaceReplicationClusters();
}
@@ -270,9 +272,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public int getNamespaceMessageTTL(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
-
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.TTL, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.message_ttl_in_seconds;
@@ -410,9 +411,9 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
public BundlesData getBundlesData(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
validateNamespaceName(tenant, namespace);
+ validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_BUNDLE);
Policies policies = getNamespacePolicies(namespaceName);
@@ -584,9 +585,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
-
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.BACKLOG, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.backlog_quota_map;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1eefc37..bc04f33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -59,6 +59,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -77,6 +78,7 @@ import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -292,8 +294,8 @@ public class ServerCnx extends PulsarHandler {
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().canLookupAsync(topicName, authRole,
- authenticationData);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -364,8 +366,8 @@ public class ServerCnx extends PulsarHandler {
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService()
- .canLookupAsync(topicName, authRole, authenticationData);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -760,8 +762,9 @@ public class ServerCnx extends PulsarHandler {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().canConsumeAsync(topicName, authRole,
- authenticationData, subscribe.getSubscription());
+ authenticationData.setSubscription(subscriptionName);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -769,9 +772,13 @@ public class ServerCnx extends PulsarHandler {
if (isProxyAuthorized) {
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
- originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
- subscriptionName);
+ if (authenticationData == null) {
+ authenticationData = new AuthenticationDataCommand("", subscriptionName);
+ } else {
+ authenticationData.setSubscription(subscriptionName);
+ }
+ authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
@@ -979,8 +986,8 @@ public class ServerCnx extends PulsarHandler {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().canProduceAsync(topicName,
- authRole, authenticationData);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -988,8 +995,8 @@ public class ServerCnx extends PulsarHandler {
if (isProxyAuthorized) {
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationService().canProduceAsync(topicName,
- originalPrincipal != null ? originalPrincipal : authRole, authenticationData);
+ authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 2273d1d..96569e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -61,8 +61,13 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -771,4 +776,69 @@ public abstract class PulsarWebResource {
// Non-Usual HTTP error codes
protected static final int NOT_IMPLEMENTED = 501;
+ public void validateTenantOperation(String tenant, TenantOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowTenantOperation(
+ tenant, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTenantOperation for" +
+ " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]",
+ originalPrincipal(), clientAppId(), operation.toString(), tenant));
+ }
+ }
+ }
+
+ public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespaceOperation(namespaceName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespaceOperation for" +
+ " operation [%s] on namespace [%s]", operation.toString(), namespaceName));
+ }
+ }
+ }
+
+ public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespacePolicyOperation for" +
+ " operation [%s] on namespace [%s] on policy [%s]", operation.toString(), namespaceName, policy.toString()));
+ }
+ }
+ }
+
+ public void validateTopicOperation(TopicName topicName, TopicOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" +
+ " operation [%s] on topic [%s]", operation.toString(), topicName));
+ }
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index f98758c..2df24bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -81,9 +81,12 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -154,9 +157,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(null).when(namespaces).originalPrincipal();
doReturn(null).when(namespaces).clientAuthData();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
- doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
- doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
- doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:8080"));
admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:8080"));
@@ -171,7 +171,15 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
new BundlesData());
doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
- .validateAdminAccessForTenant(this.testOtherTenant);
+ .validateTenantOperation(this.testOtherTenant, null);
+
+ doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+ .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+ doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+ .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.REPLICATION, PolicyOperation.WRITE);
nsSvc = pulsar.getNamespaceService();
}
@@ -878,7 +886,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testValidateAdminAccessOnTenant() throws Exception {
-
try {
final String property = "prop";
pulsar.getConfiguration().setAuthenticationEnabled(true);
@@ -888,7 +895,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
new TenantInfo(Sets.newHashSet(namespaces.clientAppId()), Sets.newHashSet("use")));
ZkUtils.createFullPathOptimistic(pulsar.getConfigurationCache().getZooKeeper(), path, data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- namespaces.validateAdminAccessForTenant(property);
+
+ namespaces.validateTenantOperation(property, null);
} finally {
pulsar.getConfiguration().setAuthenticationEnabled(false);
pulsar.getConfiguration().setAuthorizationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index cb913fb..aa71801 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -478,8 +478,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testProducerCommandWithAuthorizationPositive() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canProduceAsync(Mockito.any(),
- Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
resetChannel();
@@ -605,8 +605,8 @@ public class ServerCnxTest {
public void testProducerCommandWithAuthorizationNegative() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canProduceAsync(Mockito.any(),
- Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1195,8 +1195,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canConsumeAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1217,8 +1217,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canConsumeAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 7b324c0..ddeed96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
@@ -49,7 +50,11 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -487,6 +492,41 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
String subscriptionName, String role, String authDataJson) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ return true;
+ }
}
/**
@@ -515,21 +555,32 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
}
public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
+ @Override
+ public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ try {
+ return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
@Override
- public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
- AuthenticationDataSource authenticationData, String subscription) {
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- if (isNotBlank(subscription)) {
- if (!subscription.startsWith(role)) {
- future.completeExceptionally(new PulsarServerException(
- "The subscription name needs to be prefixed by the authentication role"));
+ if (authData.hasSubscription()) {
+ String subscription = authData.getSubscription();
+ if (isNotBlank(subscription)) {
+ if (!subscription.startsWith(role)) {
+ future.completeExceptionally(new PulsarServerException(
+ "The subscription name needs to be prefixed by the authentication role"));
+ }
}
}
future.complete(clientRole.equals(role));
return future;
}
-
}
public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
index cc3cb3b..ec9d4e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -61,6 +61,8 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
+ super.conf.setAuthorizationEnabled(true);
+ super.conf.setAuthenticationEnabled(true);
}
@AfterMethod
@@ -99,9 +101,9 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
**/
assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")),
- "Tenant does not exist");
- assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Tenant does not exist");
- assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Tenant does not exist");
+ "Need to authenticate to perform the request");
+ assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Need to authenticate to perform the request");
+ assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Need to authenticate to perform the request");
server.stop();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
new file mode 100644
index 0000000..bda93c4
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Namespace authorization operations.
+ */
+public enum NamespaceOperation {
+ CREATE_TOPIC,
+ GET_TOPIC,
+ GET_TOPICS,
+ DELETE_TOPIC,
+
+ ADD_BUNDLE,
+ DELETE_BUNDLE,
+ GET_BUNDLE,
+
+ GET_PERMISSION,
+ GRANT_PERMISSION,
+ REVOKE_PERMISSION,
+
+ CLEAR_BACKLOG,
+ UNSUBSCRIBE,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
new file mode 100644
index 0000000..439ed7b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * PolicyName authorization operations.
+ */
+public enum PolicyName {
+ ALL,
+ ANTI_AFFINITY,
+ BACKLOG,
+ COMPACTION,
+ DELAYED_DELIVERY,
+ DEDUPLICATION,
+ MAX_CONSUMERS,
+ MAX_PRODUCERS,
+ MAX_UNACKED,
+ OFFLOAD,
+ PERSISTENCE,
+ RATE,
+ RETENTION,
+ REPLICATION,
+ REPLICATION_RATE,
+ SCHEMA_COMPATIBILITY_STRATEGY,
+ SUBSCRIPTION_AUTH_MODE,
+ ENCRYPTION,
+ TTL,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java
new file mode 100644
index 0000000..ce70341
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * PolicyOperation authorization operations.
+ */
+public enum PolicyOperation {
+ READ,
+ WRITE,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java
new file mode 100644
index 0000000..b444433
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Tenant authorization operations.
+ */
+public enum TenantOperation {
+ CREATE_NAMESPACE,
+ DELETE_NAMESPACE,
+ LIST_NAMESPACES,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
new file mode 100644
index 0000000..7e54cca
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Topic authorization operations.
+ */
+public enum TopicOperation {
+ LOOKUP,
+ PRODUCE,
+ CONSUME,
+
+ COMPACT,
+ EXPIRE_MESSAGES,
+ OFFLOAD,
+ PEEK_MESSAGES,
+ RESET_CURSOR,
+ SKIP,
+ TERMINATE,
+ UNLOAD,
+
+ GRANT_PERMISSION,
+ GET_PERMISSION,
+ REVOKE_PERMISSION,
+
+ ADD_BUNDLE_RANGE,
+ GET_BUNDLE_RANGE,
+ DELETE_BUNDLE_RANGE,
+
+ SUBSCRIBE,
+ GET_SUBSCRIPTIONS,
+ UNSUBSCRIBE,
+}