You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/12 13:03:13 UTC
[pulsar] branch master updated: allowTopicOperationAsync should
check the original role is super user (#1355) (#7788)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 48f5a2f allowTopicOperationAsync should check the original role is super user (#1355) (#7788)
48f5a2f is described below
commit 48f5a2f62c148b3df617be060fefed51f3145979
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Aug 12 06:02:51 2020 -0700
allowTopicOperationAsync should check the original role is super user (#1355) (#7788)
* Fix allowTopicOperationAsync logic (#1355)
*Modifications*
- We should use the original role to verify if it is allowed for a given topic operation
- use the original authentication data
- Authz provider doesn't have to be aware of proxyRole
- Fix authorization test
* Refactor authorize logic to provide a uniform authorization behavior
---
.../authorization/AuthorizationProvider.java | 230 +++++++++++++++++----
.../broker/authorization/AuthorizationService.java | 217 +++++++++++++------
.../authorization/PulsarAuthorizationProvider.java | 103 ++++-----
.../pulsar/broker/web/AuthenticationFilter.java | 2 -
.../broker/admin/impl/PersistentTopicsBase.java | 30 +++
.../org/apache/pulsar/broker/service/Consumer.java | 2 +-
.../org/apache/pulsar/broker/service/Producer.java | 2 +-
.../apache/pulsar/broker/service/ServerCnx.java | 229 ++++++++++----------
.../pulsar/broker/web/PulsarWebResource.java | 66 +++---
.../pulsar/broker/service/ServerCnxTest.java | 11 +-
.../api/AuthorizationProducerConsumerTest.java | 47 ++---
.../pulsar/proxy/server/AdminProxyHandler.java | 6 +-
12 files changed, 598 insertions(+), 347 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 57147e7..0403f34 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -225,12 +226,19 @@ public interface AuthorizationProvider extends Closeable {
* @param authData
* @return CompletableFuture<Boolean>
*/
+ @Deprecated
default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
TenantOperation operation,
AuthenticationDataSource authData) {
- return isTenantAdmin(tenantName, role, null, authData);
+ return allowTenantOperationAsync(
+ tenantName,
+ StringUtils.isBlank(originalRole) ? role : originalRole,
+ operation,
+ authData
+ );
}
+ @Deprecated
default Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
AuthenticationDataSource authData) {
try {
@@ -243,26 +251,93 @@ public interface AuthorizationProvider extends Closeable {
}
/**
+ * Check if a given <tt>role</tt> is allowed to execute a given <tt>operation</tt> on the tenant.
+ *
+ * @param tenantName tenant name
+ * @param role role name
+ * @param operation tenant operation
+ * @param authData authenticated data of the role
+ * @return a completable future represents check result
+ */
+ default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" +
+ " provider you are using.",
+ operation.toString(), tenantName)));
+ }
+
+ default Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTenantOperationAsync(tenantName, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
+ * Check if a given <tt>role</tt> is allowed to execute a given <tt>operation</tt> on the namespace.
+ *
+ * @param namespaceName namespace name
+ * @param role role name
+ * @param operation namespace operation
+ * @param authData authenticated data
+ * @return a completable future represents check result
+ */
+ default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("NamespaceOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowNamespaceOperation(NamespaceName namespaceName,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ try {
+ return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
* Grant authorization-action permission on a namespace to the given client
+ *
* @param namespaceName
- * @param originalRole role not overriden by proxy role if request do pass through proxy
- * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param role
* @param operation
* @param authData
* @return CompletableFuture<Boolean>
*/
- default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
- String role, NamespaceOperation operation,
- AuthenticationDataSource authData) {
- return FutureUtil.failedFuture(
- new IllegalStateException(
- String.format("NamespaceOperation(%s) on namespace(%s) by role(%s) is not supported" +
- " by the Authorization provider you are using.",
- operation.toString(), namespaceName.toString(), role == null ? "null" : role)));
+ @Deprecated
+ default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+ String originalRole,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return allowNamespaceOperationAsync(
+ namespaceName,
+ StringUtils.isBlank(originalRole) ? role : originalRole,
+ operation,
+ authData
+ );
}
- default Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role,
- NamespaceOperation operation, AuthenticationDataSource authData) {
+ @Deprecated
+ default Boolean allowNamespaceOperation(NamespaceName namespaceName,
+ String originalRole,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
try {
return allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData).get();
} catch (InterruptedException e) {
@@ -273,6 +348,39 @@ public interface AuthorizationProvider extends Closeable {
}
/**
+ * Check if a given <tt>role</tt> is allowed to execute a given policy <tt>operation</tt> on the namespace.
+ *
+ * @param namespaceName namespace name
+ * @param policy policy name
+ * @param operation policy operation
+ * @param role role name
+ * @param authData authenticated data
+ * @return a completable future represents check result
+ */
+ default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String role,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("NamespacePolicyOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String role,
+ AuthenticationDataSource authData) {
+ try {
+ return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
* Grant authorization-action permission on a namespace to the given client
* @param namespaceName
* @param originalRole role not overriden by proxy role if request do pass through proxy
@@ -281,16 +389,32 @@ public interface AuthorizationProvider extends Closeable {
* @param authData
* @return CompletableFuture<Boolean>
*/
- default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
- PolicyOperation operation, String originalRole,
- String role, AuthenticationDataSource authData) {
- return isTenantAdmin(namespaceName.getTenant(), role, null, authData);
+ @Deprecated
+ default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ return allowNamespacePolicyOperationAsync(
+ namespaceName,
+ policy,
+ operation,
+ StringUtils.isBlank(originalRole) ? role : originalRole,
+ authData
+ );
}
- default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation,
- String originalRole, String role, AuthenticationDataSource authData) {
+ @Deprecated
+ default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
try {
- return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData).get();
+ return allowNamespacePolicyOperationAsync(
+ namespaceName, policy, operation, originalRole, role, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
@@ -298,6 +422,35 @@ public interface AuthorizationProvider extends Closeable {
}
}
+ /**
+ * Check if a given <tt>role</tt> is allowed to execute a given topic <tt>operation</tt> on the topic.
+ *
+ * @param topic topic name
+ * @param role role name
+ * @param operation topic operation
+ * @param authData authenticated data
+ * @return CompletableFuture<Boolean>
+ */
+ default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("TopicOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowTopicOperation(TopicName topicName,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTopicOperationAsync(topicName, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
/**
* Grant authorization-action permission on a topic to the given client
@@ -308,27 +461,26 @@ public interface AuthorizationProvider extends Closeable {
* @param authData
* @return CompletableFuture<Boolean>
*/
- default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role,
- TopicOperation operation,
- AuthenticationDataSource authData) {
- switch (operation) {
- case PRODUCE:
- return canProduceAsync(topic, role, authData);
- case CONSUME:
- return canConsumeAsync(topic, role, authData, null);
- case LOOKUP:
- return canLookupAsync(topic, role, authData);
- default:
- return FutureUtil.failedFuture(
- new IllegalStateException(
- String.format("TopicOperation(%s) on topic(%s) by role(%s) is not supported" +
- " by the Authorization provider you are using.",
- operation.toString(), topic.toString(), role == null ? "null" : null)));
- }
+ @Deprecated
+ default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+ String originalRole,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return allowTopicOperationAsync(
+ topic,
+ StringUtils.isBlank(originalRole) ? role : originalRole,
+ operation,
+ authData
+ );
}
- default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation,
- AuthenticationDataSource authData) {
+ @Deprecated
+ default Boolean allowTopicOperation(TopicName topicName,
+ String originalRole,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
try {
return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
} catch (InterruptedException e) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index b91d616..afa85ee 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.authorization;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
-import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -35,12 +37,10 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
import static java.util.concurrent.TimeUnit.SECONDS;
/**
@@ -341,45 +341,84 @@ public class AuthorizationService {
return provider.allowSinkOpsAsync(namespaceName, role, authenticationData);
}
+ private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
+ String originalPrincipal) {
+ if (proxyRoles.contains(authenticatedPrincipal)) {
+ // Request has come from a proxy
+ if (StringUtils.isBlank(originalPrincipal)) {
+ log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
+ throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
+ }
+ if (proxyRoles.contains(originalPrincipal)) {
+ log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
+ throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+ }
+ }
+ }
+
+ private boolean isProxyRole(String role) {
+ return role != null && conf.getProxyRoles().contains(role);
+ }
+
/**
* Grant authorization-action permission on a tenant to the given client
*
- * @param tenantName
- * @param operation
- * @param originalRole
- * @param role
+ * @param tenantName tenant name
+ * @param operation tenant operation
+ * @param role role name
* @param authData
* additional authdata in json for targeted authorization provider
* @return IllegalArgumentException when tenant not found
* @throws IllegalStateException
* when failed to grant permission
*/
- public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, TenantOperation operation,
- String originalRole, String role,
- AuthenticationDataSource authData) {
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
+ TenantOperation operation,
+ String role,
+ AuthenticationDataSource authData) {
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
- return provider.allowTenantOperationAsync(tenantName, originalRole, role, operation, authData);
+ return provider.allowTenantOperationAsync(tenantName, role, operation, authData);
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
"allowTenantOperationAsync"));
}
- public Boolean allowTenantOperation(String tenantName, TenantOperation operation, String orignalRole, String role,
- AuthenticationDataSource authData) {
- if (!this.conf.isAuthorizationEnabled()) {
- return true;
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
+ TenantOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (isProxyRole(role)) {
+ CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
+ tenantName, operation, role, authData);
+ CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTenantOperationAsync(
+ tenantName, operation, originalRole, authData);
+ return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+ (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
+ } else {
+ return allowTenantOperationAsync(tenantName, operation, role, authData);
}
+ }
- if (provider != null) {
- return provider.allowTenantOperation(tenantName, orignalRole, role, operation, authData);
+ public boolean allowTenantOperation(String tenantName,
+ TenantOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTenantOperationAsync(
+ tenantName, operation, originalRole, role, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
}
-
- throw new IllegalStateException("No authorization provider configured for allowTenantOperation");
}
/**
@@ -387,7 +426,6 @@ public class AuthorizationService {
*
* @param namespaceName
* @param operation
- * @param originalRole
* @param role
* @param authData
* additional authdata in json for targeted authorization provider
@@ -397,31 +435,51 @@ public class AuthorizationService {
*/
public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
NamespaceOperation operation,
- String originalRole, String role,
+ String role,
AuthenticationDataSource authData) {
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
- return provider.allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData);
+ return provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData);
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
"allowNamespaceOperationAsync"));
}
- public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation,
- String originalPrincipal, String role, AuthenticationDataSource authData) {
- if (!this.conf.isAuthorizationEnabled()) {
- return true;
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+ NamespaceOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (isProxyRole(role)) {
+ CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
+ namespaceName, operation, role, authData);
+ CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespaceOperationAsync(
+ namespaceName, operation, originalRole, authData);
+ return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+ (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
+ } else {
+ return allowNamespaceOperationAsync(namespaceName, operation, role, authData);
}
+ }
- if (provider != null) {
- return provider.allowNamespaceOperation(namespaceName, originalPrincipal, role, operation, authData);
+ public boolean allowNamespaceOperation(NamespaceName namespaceName,
+ NamespaceOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ try {
+ return allowNamespaceOperationAsync(
+ namespaceName, operation, originalRole, role, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
}
-
- throw new IllegalStateException("No authorization provider configured for allowNamespaceOperation");
}
/**
@@ -429,7 +487,6 @@ public class AuthorizationService {
*
* @param namespaceName
* @param operation
- * @param originalRole
* @param role
* @param authData
* additional authdata in json for targeted authorization provider
@@ -437,33 +494,56 @@ public class AuthorizationService {
* @throws IllegalStateException
* when failed to grant permission
*/
- public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
- PolicyOperation operation, String originalRole,
- String role, AuthenticationDataSource authData) {
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String role,
+ AuthenticationDataSource authData) {
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
- return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData);
+ return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData);
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
"allowNamespacePolicyOperationAsync"));
}
- public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy,
- PolicyOperation operation, String originalPrincipal, String role,
- AuthenticationDataHttps authData) {
- if (!this.conf.isAuthorizationEnabled()) {
- return true;
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (isProxyRole(role)) {
+ CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
+ namespaceName, policy, operation, role, authData);
+ CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync(
+ namespaceName, policy, operation, originalRole, authData);
+ return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+ (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
+ } else {
+ return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData);
}
+ }
- if (provider != null) {
- return provider.allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal, role, authData);
+ public boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ try {
+ return allowNamespacePolicyOperationAsync(
+ namespaceName, policy, operation, originalRole, role, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
}
-
- throw new IllegalStateException("No authorization provider configured for allowNamespacePolicyOperation");
}
/**
@@ -478,32 +558,43 @@ public class AuthorizationService {
* @throws IllegalStateException
* when failed to grant permission
*/
- public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, TopicOperation operation,
- String originalRole, String role,
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
+ TopicOperation operation,
+ String role,
AuthenticationDataSource authData) {
+ if (log.isDebugEnabled()) {
+ log.debug("Check if role {} is allowed to execute topic operation {} on topic {}",
+ role, operation, topicName);
+ }
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
- return provider.allowTopicOperationAsync(topicName, originalRole, role, operation, authData);
+ CompletableFuture<Boolean> allowFuture =
+ provider.allowTopicOperationAsync(topicName, role, operation, authData);
+ if (log.isDebugEnabled()) {
+ return allowFuture.whenComplete((allowed, exception) -> {
+ if (exception == null) {
+ if (allowed) {
+ log.debug("Topic operation {} on topic {} is allowed: role = {}",
+ operation, topicName, role);
+ } else{
+ log.debug("Topic operation {} on topic {} is NOT allowed: role = {}",
+ operation, topicName, role);
+ }
+ } else {
+ log.debug("Failed to check if topic operation {} on topic {} is allowed:"
+ + " role = {}",
+ operation, topicName, role, exception);
+ }
+ });
+ } else {
+ return allowFuture;
+ }
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
"allowTopicOperationAsync"));
}
-
- public Boolean allowTopicOperation(TopicName topicName, TopicOperation operation,
- String orignalRole, String role,
- AuthenticationDataSource authData) {
- if (!this.conf.isAuthorizationEnabled()) {
- return true;
- }
-
- if (provider != null) {
- return provider.allowTopicOperation(topicName, orignalRole, role, operation, authData);
- }
-
- throw new IllegalStateException("No authorization provider configured for allowTopicOperation");
- }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index d7cea44..66d0c2e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.authorization;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
import java.io.IOException;
import java.util.Collections;
@@ -29,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import com.google.common.base.Joiner;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -39,8 +39,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
-import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
-
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -528,38 +526,43 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
}
@Override
- public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
- TenantOperation operation,
- AuthenticationDataSource authData) {
- return validateTenantAdminAccess(tenantName, originalRole, role, authData);
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
+ String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(tenantName, role, authData);
}
@Override
- public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
- String role, NamespaceOperation operation,
- AuthenticationDataSource authData) {
- return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
}
@Override
- public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
- PolicyOperation operation, String originalRole,
- String role, AuthenticationDataSource authData) {
- return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String role,
+ AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
}
@Override
- public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, String originalRole, String role,
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
+ String role,
TopicOperation operation,
AuthenticationDataSource authData) {
CompletableFuture<Boolean> isAuthorizedFuture;
switch (operation) {
- case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData);
+ case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, authData);
break;
- case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData);
+ case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, role, authData);
break;
- case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData, authData.getSubscription());
+ case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
break;
default: isAuthorizedFuture = FutureUtil.failedFuture(
new IllegalStateException("TopicOperation is not supported."));
@@ -568,7 +571,14 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, authData, conf);
return isSuperUserFuture
- .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> isSuperUser || isAuthorized);
+ .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Verify if role {} is allowed to {} to topic {}:"
+ + " isSuperUser={}, isAuthorized={}",
+ role, operation, topicName, isSuperUser, isAuthorized);
+ }
+ return isSuperUser || isAuthorized;
+ });
}
private static String path(String... parts) {
@@ -578,43 +588,20 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
return sb.toString();
}
- private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String originalRole, String role,
+ private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName,
+ String role,
AuthenticationDataSource authData) {
try {
TenantInfo tenantInfo = configCache.propertiesCache()
.get(path(POLICIES, tenantName))
.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
- validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
-
- if (role != null && conf.getProxyRoles().contains(role)) {
- // role check
- CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
- CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
- CompletableFuture<Boolean> isRoleAuthorizedFuture = isRoleSuperUserFuture
- .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
- isRoleSuperUser || isRoleTenantAdmin);
-
- // originalRole check
- CompletableFuture<Boolean> isOriginalRoleSuperUserFuture = isSuperUser(originalRole, authData, conf);
- CompletableFuture<Boolean> isOriginalRoleTenantAdminFuture = isTenantAdmin(tenantName, originalRole,
- tenantInfo, authData);
- CompletableFuture<Boolean> isOriginalRoleAuthorizedFuture = isOriginalRoleSuperUserFuture
- .thenCombine(isOriginalRoleTenantAdminFuture, (isOriginalRoleSuperUser, isOriginalRoleTenantAdmin) ->
- isOriginalRoleSuperUser || isOriginalRoleTenantAdmin);
-
- // merging
- return isRoleAuthorizedFuture
- .thenCombine(isOriginalRoleAuthorizedFuture, (isRoleAuthorized, isOriginalRoleAuthorized) ->
- isRoleAuthorized && isOriginalRoleAuthorized);
- } else {
- // role check
- CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
- CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
- return isRoleSuperUserFuture
- .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
- isRoleSuperUser || isRoleTenantAdmin);
- }
+ // role check
+ CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
+ CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+ return isRoleSuperUserFuture
+ .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+ isRoleSuperUser || isRoleTenantAdmin);
} catch (KeeperException.NoNodeException e) {
log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
@@ -624,18 +611,4 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
}
}
- private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
- String originalPrincipal) {
- if (proxyRoles.contains(authenticatedPrincipal)) {
- // Request has come from a proxy
- if (StringUtils.isBlank(originalPrincipal)) {
- log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
- throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
- }
- if (proxyRoles.contains(originalPrincipal)) {
- log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
- throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
- }
- }
- }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index 6b4fc8c..8555f34 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.web;
-import static com.google.common.base.Preconditions.checkState;
-
import java.io.IOException;
import javax.servlet.Filter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 612c7a3..e66d29c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2617,6 +2617,36 @@ public class PersistentTopicsBase extends AdminResource {
}
/**
+ * Get partitioned topic metadata without checking the permission.
+ */
+ public static CompletableFuture<PartitionedTopicMetadata> unsafeGetPartitionedTopicMetadataAsync(
+ PulsarService pulsar, TopicName topicName) {
+ CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture();
+
+ String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getNamespace(),
+ topicName.getDomain().toString(), topicName.getEncodedLocalName());
+
+ // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+ // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+ // producer/consumer
+ checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
+ .thenCompose(res -> pulsar.getBrokerService()
+ .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+ .thenAccept(metadata -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Total number of partitions for topic {} is {}", topicName,
+ metadata.partitions);
+ }
+ metadataFuture.complete(metadata);
+ }).exceptionally(ex -> {
+ metadataFuture.completeExceptionally(ex.getCause());
+ return null;
+ });
+
+ return metadataFuture;
+ }
+
+ /**
* Get the Topic object reference from the Pulsar broker
*/
private Topic getTopicReference(TopicName topicName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 2131b18..03bbe12 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -152,7 +152,7 @@ public class Consumer {
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
this.appId = appId;
- this.authenticationData = cnx.authenticationData;
+ this.authenticationData = cnx.getAuthenticationData();
this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 771cd21..258186b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -101,7 +101,7 @@ public class Producer {
this.epoch = epoch;
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
- this.authenticationData = cnx.authenticationData;
+ this.authenticationData = cnx.getAuthenticationData();
this.msgIn = new Rate();
this.chuckedMessageRate = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 15b3c97..8949db4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.getPartitionedTopicMetadata;
+import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
@@ -141,6 +142,7 @@ public class ServerCnx extends PulsarHandler {
// In case of proxy, if the authentication credentials are forwardable,
// it will hold the credentials of the original client
AuthenticationState originalAuthState;
+ AuthenticationDataSource originalAuthData;
private boolean pendingAuthChallengeResponse = false;
// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
@@ -273,6 +275,65 @@ public class ServerCnx extends PulsarHandler {
// // Incoming commands handling
// ////
+ private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation) {
+ CompletableFuture<Boolean> isProxyAuthorizedFuture;
+ CompletableFuture<Boolean> isAuthorizedFuture;
+ if (service.isAuthorizationEnabled()) {
+ if (originalPrincipal != null) {
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
+ topicName, operation, originalPrincipal, getAuthenticationData());
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
+ topicName, operation, authRole, authenticationData);
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ isAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
+ if (!isProxyAuthorized) {
+ log.error("OriginalRole {} is not authorized to perform operation {} on topic {}",
+ originalPrincipal, operation, topicName);
+ }
+ if (!isAuthorized) {
+ log.error("Role {} is not authorized to perform operation {} on topic {}",
+ authRole, operation, topicName);
+ }
+ return isProxyAuthorized && isAuthorized;
+ });
+ }
+
+ private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) {
+ CompletableFuture<Boolean> isProxyAuthorizedFuture;
+ CompletableFuture<Boolean> isAuthorizedFuture;
+ if (service.isAuthorizationEnabled()) {
+ if (authenticationData == null) {
+ authenticationData = new AuthenticationDataCommand("", subscriptionName);
+ } else {
+ authenticationData.setSubscription(subscriptionName);
+ }
+ if (originalAuthData != null) {
+ originalAuthData.setSubscription(subscriptionName);
+ }
+ return isTopicOperationAllowed(topicName, operation);
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ isAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
+ if (!isProxyAuthorized) {
+ log.error("OriginalRole {} is not authorized to perform operation {} on topic {}, subscription {}",
+ originalPrincipal, operation, topicName, subscriptionName);
+ }
+ if (!isAuthorized) {
+ log.error("Role {} is not authorized to perform operation {} on topic {}, subscription {}",
+ authRole, operation, topicName, subscriptionName);
+ }
+ return isProxyAuthorized && isAuthorized;
+ });
+ }
+
@Override
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();
@@ -297,18 +358,10 @@ public class ServerCnx extends PulsarHandler {
lookupSemaphore.release();
return;
}
- CompletableFuture<Boolean> isProxyAuthorizedFuture;
- if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
- TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
- } else {
- isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
- }
- String finalOriginalPrincipal = originalPrincipal;
- isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
- if (isProxyAuthorized) {
+ isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
+ if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
- finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
+ getPrincipal(), getAuthenticationData(),
requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
@@ -324,14 +377,14 @@ public class ServerCnx extends PulsarHandler {
});
} else {
final String msg = "Proxy Client is not authorized to Lookup";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
}
return null;
}).exceptionally(ex -> {
final String msg = "Exception occured while trying to authorize lookup";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName, ex);
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName, ex);
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
return null;
@@ -369,19 +422,10 @@ public class ServerCnx extends PulsarHandler {
lookupSemaphore.release();
return;
}
- CompletableFuture<Boolean> isProxyAuthorizedFuture;
- if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
- TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
- } else {
- isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
- }
- String finalOriginalPrincipal = originalPrincipal;
- isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
- if (isProxyAuthorized) {
- getPartitionedTopicMetadata(getBrokerService().pulsar(),
- authRole, finalOriginalPrincipal, authenticationData,
- topicName).handle((metadata, ex) -> {
+ isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
+ if (isAuthorized) {
+ unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
+ .handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
@@ -407,7 +451,7 @@ public class ServerCnx extends PulsarHandler {
});
} else {
final String msg = "Proxy Client is not authorized to Get Partition Metadata";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
ctx.writeAndFlush(
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
@@ -415,7 +459,7 @@ public class ServerCnx extends PulsarHandler {
return null;
}).exceptionally(ex -> {
final String msg = "Exception occured while trying to authorize get Partition Metadata";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
return null;
@@ -505,6 +549,9 @@ public class ServerCnx extends PulsarHandler {
String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
AuthData brokerData = authState.authenticate(clientData);
+ if (log.isDebugEnabled()) {
+ log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole);
+ }
if (authState.isComplete()) {
// Authentication has completed. It was either:
@@ -520,7 +567,7 @@ public class ServerCnx extends PulsarHandler {
if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
- remoteAddress, authMethod, authRole, originalPrincipal);
+ remoteAddress, authMethod, this.authRole, originalPrincipal);
}
if (state != State.Connected) {
@@ -607,8 +654,12 @@ public class ServerCnx extends PulsarHandler {
checkArgument(state == State.Start);
if (log.isDebugEnabled()) {
- log.debug("Received CONNECT from {}, auth enabled: {}",
- remoteAddress, service.isAuthenticationEnabled());
+ log.debug("Received CONNECT from {}, auth enabled: {}:"
+ + " has original principal = {}, original principal = {}",
+ remoteAddress,
+ service.isAuthenticationEnabled(),
+ connect.hasOriginalPrincipal(),
+ connect.getOriginalPrincipal());
}
String clientVersion = connect.getClientVersion();
@@ -656,6 +707,12 @@ public class ServerCnx extends PulsarHandler {
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
authenticationData = authState.getAuthDataSource();
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authenticate role : {}", remoteAddress,
+ authState != null ? authState.getAuthRole() : null);
+ }
+
state = doAuthentication(clientData, clientProtocolVersion, clientVersion);
// This will fail the check if:
@@ -684,9 +741,18 @@ public class ServerCnx extends PulsarHandler {
AuthData.of(connect.getOriginalAuthData().getBytes()),
remoteAddress,
sslSession);
+ originalAuthData = originalAuthState.getAuthDataSource();
originalPrincipal = originalAuthState.getAuthRole();
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authenticate original role : {}", remoteAddress, originalPrincipal);
+ }
} else {
originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authenticate original role (forwarded from proxy): {}",
+ remoteAddress, originalPrincipal);
+ }
}
} catch (Exception e) {
String msg = "Unable to authenticate";
@@ -737,6 +803,11 @@ public class ServerCnx extends PulsarHandler {
return;
}
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
+ remoteAddress, authRole, originalPrincipal);
+ }
+
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided while subscribing ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
@@ -765,33 +836,15 @@ public class ServerCnx extends PulsarHandler {
final boolean forceTopicCreation = subscribe.getForceTopicCreation();
final PulsarApi.KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null;
- CompletableFuture<Boolean> isProxyAuthorizedFuture;
- if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- authenticationData.setSubscription(subscriptionName);
- isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
- TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
- } else {
- isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
- }
- isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
- if (isProxyAuthorized) {
- CompletableFuture<Boolean> authorizationFuture;
- if (service.isAuthorizationEnabled()) {
- if (authenticationData == null) {
- authenticationData = new AuthenticationDataCommand("", subscriptionName);
- } else {
- authenticationData.setSubscription(subscriptionName);
- }
- authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
- TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
- } else {
- authorizationFuture = CompletableFuture.completedFuture(true);
- }
-
- authorizationFuture.thenApply(isAuthorized -> {
+ CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
+ topicName,
+ subscriptionName,
+ TopicOperation.CONSUME
+ );
+ isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
+ log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, getPrincipal());
}
log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
@@ -919,24 +972,12 @@ public class ServerCnx extends PulsarHandler {
});
} else {
String msg = "Client is not authorized to subscribe";
- log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+ log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
- }).exceptionally(e -> {
- String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
- log.warn(msg);
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
- return null;
- });
- } else {
- final String msg = "Proxy Client is not authorized to subscribe";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
- }
- return null;
}).exceptionally(ex -> {
- String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+ String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
if (ex.getCause() instanceof PulsarServerException) {
log.info(msg);
} else {
@@ -989,27 +1030,13 @@ public class ServerCnx extends PulsarHandler {
return;
}
- CompletableFuture<Boolean> isProxyAuthorizedFuture;
- if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
- TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
- } else {
- isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
- }
- isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
- if (isProxyAuthorized) {
- CompletableFuture<Boolean> authorizationFuture;
- if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
- TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
- } else {
- authorizationFuture = CompletableFuture.completedFuture(true);
- }
-
- authorizationFuture.thenApply(isAuthorized -> {
+ CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
+ topicName, TopicOperation.PRODUCE
+ );
+ isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
+ log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, getPrincipal());
}
CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId,
@@ -1092,7 +1119,7 @@ public class ServerCnx extends PulsarHandler {
});
schemaVersionFuture.thenAccept(schemaVersion -> {
- Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
+ Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, getPrincipal(),
isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName);
try {
@@ -1153,24 +1180,12 @@ public class ServerCnx extends PulsarHandler {
});
} else {
String msg = "Client is not authorized to Produce";
- log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+ log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
- }).exceptionally(e -> {
- String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
- log.warn(msg);
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
- return null;
- });
- } else {
- final String msg = "Proxy Client is not authorized to Produce";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
- }
- return null;
}).exceptionally(ex -> {
- String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+ String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
log.warn(msg);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
return null;
@@ -2043,7 +2058,11 @@ public class ServerCnx extends PulsarHandler {
}
public AuthenticationDataSource getAuthenticationData() {
- return authenticationData;
+ return originalAuthData != null ? originalAuthData : authenticationData;
+ }
+
+ public String getPrincipal() {
+ return originalPrincipal != null ? originalPrincipal : authRole;
}
public AuthenticationProvider getAuthenticationProvider() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index fb7877e..edbc93c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -68,7 +68,6 @@ import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
-import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -422,10 +421,12 @@ public abstract class PulsarWebResource {
* will throw an exception to redirect to assigned owner or leader; if authoritative is true then it will try to
* acquire all the namespace bundles.
*
- * @param fqnn
- * @param authoritative
- * @param readOnly
- * @param bundleData
+ * @param tenant tenant name
+ * @param cluster cluster name
+ * @param namespace namespace name
+ * @param authoritative if it is an authoritative request
+ * @param readOnly if the request is read-only
+ * @param bundleData bundle data
*/
protected void validateNamespaceOwnershipWithBundles(String tenant, String cluster, String namespace,
boolean authoritative, boolean readOnly, BundlesData bundleData) {
@@ -582,11 +583,8 @@ public abstract class PulsarWebResource {
* client to the appropriate broker. If no broker owns the namespace yet, this function will try to acquire the
* ownership by default.
*
+ * @param topicName topic name
* @param authoritative
- *
- * @param tenant
- * @param cluster
- * @param namespace
*/
protected void validateTopicOwnership(TopicName topicName, boolean authoritative) {
NamespaceService nsService = pulsar().getNamespaceService();
@@ -794,31 +792,33 @@ public abstract class PulsarWebResource {
protected static final int NOT_IMPLEMENTED = 501;
public void validateTenantOperation(String tenant, TenantOperation operation) {
- if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled()
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
}
- Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
- .allowTenantOperation(
- tenant, operation, originalPrincipal(), clientAppId(), clientAuthData());
-
+ boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowTenantOperation(tenant, operation, originalPrincipal(), clientAppId(), clientAuthData());
if (!isAuthorized) {
- throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTenantOperation for" +
- " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]",
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Unauthorized to validateTenantOperation for"
+ + " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]",
originalPrincipal(), clientAppId(), operation.toString(), tenant));
}
}
}
public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation) {
- if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled()
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
- Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
- .allowNamespaceOperation(namespaceName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+ boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespaceOperation(namespaceName, operation, originalPrincipal(),
+ clientAppId(), clientAuthData());
if (!isAuthorized) {
throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespaceOperation for" +
@@ -827,14 +827,18 @@ public abstract class PulsarWebResource {
}
}
- public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {
- if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ public void validateNamespacePolicyOperation(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled()
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
- Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
- .allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal(), clientAppId(), clientAuthData());
+ boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespacePolicyOperation(namespaceName, policy, operation,
+ originalPrincipal(), clientAppId(), clientAuthData());
if (!isAuthorized) {
throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespacePolicyOperation for" +
@@ -842,20 +846,4 @@ public abstract class PulsarWebResource {
}
}
}
-
- public void validateTopicOperation(TopicName topicName, TopicOperation operation) {
- if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
- if (!isClientAuthenticated(clientAppId())) {
- throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
- }
-
- Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
- .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), clientAuthData());
-
- if (!isAuthorized) {
- throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" +
- " operation [%s] on topic [%s]", operation.toString(), topicName));
- }
- }
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 94a02f6..fd56340 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -180,6 +181,8 @@ public class ServerCnxTest {
doReturn(zkCache).when(pulsar).getLocalZkCacheService();
brokerService = spy(new BrokerService(pulsar));
+ BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
+ doReturn(interceptor).when(brokerService).getInterceptor();
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(executor).when(pulsar).getOrderedExecutor();
@@ -474,7 +477,7 @@ public class ServerCnxTest {
public void testProducerCommandWithAuthorizationPositive() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
resetChannel();
@@ -605,7 +608,7 @@ public class ServerCnxTest {
public void testProducerCommandWithAuthorizationNegative() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1195,7 +1198,7 @@ public class ServerCnxTest {
public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1217,7 +1220,7 @@ public class ServerCnxTest {
public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 180142c..687b08d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -22,20 +22,18 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.fail;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.io.IOException;
-import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
import javax.naming.AuthenticationException;
import lombok.Cleanup;
-
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
@@ -54,16 +52,12 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
@@ -435,7 +429,8 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
}
@Override
- public CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
+ public CompletableFuture<Boolean> isSuperUser(String role,
+ ServiceConfiguration serviceConfiguration) {
Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
}
@@ -509,32 +504,38 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
}
@Override
- public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ public CompletableFuture<Boolean> allowTenantOperationAsync(
+ String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
return CompletableFuture.completedFuture(true);
}
@Override
- public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ public Boolean allowTenantOperation(
+ String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
return true;
}
@Override
- public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(
+ NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
return CompletableFuture.completedFuture(true);
}
@Override
- public Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ public Boolean allowNamespaceOperation(
+ NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
return null;
}
@Override
- public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ public CompletableFuture<Boolean> allowTopicOperationAsync(
+ TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
return CompletableFuture.completedFuture(true);
}
@Override
- public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ public Boolean allowTopicOperation(
+ TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
return true;
}
}
@@ -566,18 +567,10 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
@Override
- public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
- try {
- return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
- } catch (InterruptedException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- throw new RestException(e.getCause());
- }
- }
-
- @Override
- public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
if (authData.hasSubscription()) {
String subscription = authData.getSubscription();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 56a933b..697ddf9 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -62,7 +62,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class AdminProxyHandler extends ProxyServlet {
+
private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class);
+
+ private static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
+
private static final Set<String> functionRoutes = new HashSet<>(Arrays.asList(
"/admin/v3/function",
"/admin/v2/function",
@@ -334,7 +338,7 @@ class AdminProxyHandler extends ProxyServlet {
super.addProxyHeaders(clientRequest, proxyRequest);
String user = (String) clientRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
if (user != null) {
- proxyRequest.header("X-Original-Principal", user);
+ proxyRequest.header(ORIGINAL_PRINCIPAL_HEADER, user);
}
}
}