You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/12 03:29:41 UTC
[pulsar] 05/20: [broker] fix `GetTopicsOfNamespace` with binary
lookup service not check auth (#11172)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 705aae33d4640f27c9215158a86813a0daed9062
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Aug 6 14:32:38 2021 +0800
[broker] fix `GetTopicsOfNamespace` with binary lookup service not check auth (#11172)
(cherry picked from commit 64b44a5f6eb3ebdb8a3a8c44ddf81c82b12c05c6)
---
.../apache/pulsar/broker/service/ServerCnx.java | 114 ++++-
.../impl/PatternTopicsConsumerImplAuthTest.java | 488 +++++++++++++++++++++
2 files changed, 585 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 26dbd22..e0a056a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -129,6 +129,7 @@ import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
@@ -1722,6 +1723,36 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
});
}
+ private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
+ NamespaceOperation operation) {
+ CompletableFuture<Boolean> isProxyAuthorizedFuture;
+ CompletableFuture<Boolean> isAuthorizedFuture;
+ if (service.isAuthorizationEnabled()) {
+ if (originalPrincipal != null) {
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
+ namespaceName, operation, originalPrincipal, getAuthenticationData());
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ isAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
+ namespaceName, operation, authRole, authenticationData);
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ isAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
+ if (!isProxyAuthorized) {
+ log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}",
+ originalPrincipal, operation, namespaceName);
+ }
+ if (!isAuthorized) {
+ log.warn("Role {} is not authorized to perform operation {} on namespace {}",
+ authRole, operation, namespaceName);
+ }
+ return isProxyAuthorized && isAuthorized;
+ });
+ }
+
@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
@@ -1729,23 +1760,60 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
final NamespaceName namespaceName = NamespaceName.get(namespace);
- getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
- .thenAccept(topics -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
- remoteAddress, namespace, requestId, topics.size());
- }
- commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
- })
- .exceptionally(ex -> {
- log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
- remoteAddress, namespace, requestId);
- commandSender.sendErrorResponse(requestId,
- BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
- ex.getMessage());
-
- return null;
- });
+ final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+ if (lookupSemaphore.tryAcquire()) {
+ if (invalidOriginalPrincipal(originalPrincipal)) {
+ final String msg = "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ";
+ log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg,
+ authRole, originalPrincipal, namespaceName);
+ commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return;
+ }
+ isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+ if (isAuthorized) {
+ getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ .thenAccept(topics -> {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
+ remoteAddress, namespace, requestId, topics.size());
+ }
+ commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
+ lookupSemaphore.release();
+ })
+ .exceptionally(ex -> {
+ log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
+ remoteAddress, namespace, requestId);
+ commandSender.sendErrorResponse(requestId,
+ BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
+ ex.getMessage());
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ final String msg = "Proxy Client is not authorized to GetTopicsOfNamespace";
+ log.warn("[{}] {} with role {} on namespace {}", remoteAddress, msg, getPrincipal(), namespaceName);
+ commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ }
+ return null;
+ }).exceptionally(ex -> {
+ logNamespaceNameAuthException(remoteAddress, "GetTopicsOfNamespace", getPrincipal(),
+ Optional.of(namespaceName), ex);
+ final String msg = "Exception occurred while trying to authorize GetTopicsOfNamespace";
+ commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed GetTopicsOfNamespace lookup due to too many lookup-requests {}", remoteAddress,
+ namespaceName);
+ }
+ commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
+ "Failed due to too many pending lookup requests");
+ }
}
@Override
@@ -2490,6 +2558,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
}
+ private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation,
+ String principal, Optional<NamespaceName> namespaceName, Throwable ex) {
+ String namespaceNameString = namespaceName.map(t -> ", namespace=" + t.toString()).orElse("");
+ if (ex instanceof AuthenticationException) {
+ log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
+ remoteAddress, operation, principal, namespaceNameString, ex.getMessage());
+ } else {
+ log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
+ remoteAddress, operation, principal, namespaceNameString, ex);
+ }
+ }
+
public boolean hasProducers() {
return !producers.isEmpty();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
new file mode 100644
index 0000000..af47c79
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+import javax.naming.AuthenticationException;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.RestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
+ private static final long testTimeout = 90000; // 1.5 min
+ private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplAuthTest.class);
+ private static final String clientRole = "pluggableRole";
+ private static final String superUserRole = "superUser";
+ private static final Set<String> clientAuthProviderSupportedRoles = Sets.newHashSet(clientRole);
+
+ @Override
+ @BeforeMethod
+ public void setup() throws Exception {
+ // set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
+ isTcpLookup = true;
+
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add(superUserRole);
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(TestAuthenticationProvider.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
+
+ conf.setClusterName("test");
+ super.internalSetup();
+ }
+
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ private PulsarAdmin buildAdminClient() throws Exception {
+ Authentication adminAuthentication = new ClientAuthentication(superUserRole);
+ return PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl.toString())
+ .authentication(adminAuthentication)
+ .build();
+ }
+
+ // verify binary proto with correct auth check
+ // if with invalid role, consumer for pattern topic subscription will fail
+ @Test(timeOut = testTimeout)
+ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
+ String key = "BinaryProtoToGetTopics";
+ String subscriptionName = "my-ex-subscription-" + key;
+ String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+ String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
+ String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
+ String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key;
+ Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");
+
+ @Cleanup
+ PulsarAdmin admin = buildAdminClient();
+
+ String lookupUrl = pulsar.getBrokerServiceUrl();
+
+ Authentication authentication = new ClientAuthentication(clientRole);
+ Authentication authenticationInvalidRole = new ClientAuthentication("test-role");
+
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
+
+ @Cleanup
+ PulsarClient pulsarClientInvalidRole = PulsarClient.builder().serviceUrl(lookupUrl)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS)
+ .authentication(authenticationInvalidRole).build();
+
+ // 1. create partition and grant permissions
+ admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+
+ admin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+ admin.namespaces().grantPermissionOnNamespace("my-property/my-ns", clientRole,
+ EnumSet.allOf(AuthAction.class));
+
+ admin.topics().createPartitionedTopic(topicName2, 2);
+ admin.topics().createPartitionedTopic(topicName3, 3);
+
+ // 2. create producers and consumer
+ String messagePredicate = "my-message-" + key + "-";
+ int totalMessages = 30;
+
+ Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2)
+ .enableBatching(false)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+ Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3)
+ .enableBatching(false)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+ Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topicName4)
+ .enableBatching(false)
+ .create();
+
+ Consumer<byte[]> consumer;
+ // Invalid user auth-role will be rejected by authorization service
+ try {
+ consumer = pulsarClientInvalidRole.newConsumer()
+ .topicsPattern(pattern)
+ .patternAutoDiscoveryPeriod(2)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(4)
+ .subscribe();
+ Assert.fail("should have failed with authorization error");
+ } catch (PulsarClientException.AuthorizationException pa) {
+ // Ok
+ }
+
+ // create pattern topics consumer with correct role client
+ consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ .patternAutoDiscoveryPeriod(2)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(4)
+ .subscribe();
+ assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
+
+ // 4. verify consumer
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+ List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
+ List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
+
+ assertEquals(topics.size(), 6);
+ assertEquals(consumers.size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
+
+ topics.forEach(topic -> log.debug("topic: {}", topic));
+ consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
+
+ IntStream.range(0, topics.size()).forEach(index ->
+ assertEquals(consumers.get(index).getTopic(), topics.get(index)));
+
+ ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+
+ // 5. produce data
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-" + i).getBytes());
+ producer2.send((messagePredicate + "producer2-" + i).getBytes());
+ producer3.send((messagePredicate + "producer3-" + i).getBytes());
+ producer4.send((messagePredicate + "producer4-" + i).getBytes());
+ }
+
+ // 6. should receive all the message
+ int messageSet = 0;
+ Message<byte[]> message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ producer4.close();
+ }
+
+ public static class TestAuthorizationProvider implements AuthorizationProvider {
+ public ServiceConfiguration conf;
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ this.conf = conf;
+ // No-op
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+ String role, String authenticationData) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(TopicName topicname, Set<AuthAction> actions, String role,
+ String authenticationData) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, Set<String> roles, String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, String role, String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTenantOperationAsync(
+ String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Boolean allowTenantOperation(
+ String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(
+ NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ CompletableFuture<Boolean> isAuthorizedFuture;
+
+ if (role.equals(superUserRole) || role.equals(clientRole)) {
+ isAuthorizedFuture = CompletableFuture.completedFuture(true);
+ } else {
+ isAuthorizedFuture = CompletableFuture.completedFuture(false);
+ }
+
+ return isAuthorizedFuture;
+ }
+
+ @Override
+ public Boolean allowNamespaceOperation(
+ NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ try {
+ return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RestException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicOperationAsync(
+ TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ CompletableFuture<Boolean> isAuthorizedFuture;
+
+ if (role.equals(superUserRole) || role.equals(clientRole)) {
+ isAuthorizedFuture = CompletableFuture.completedFuture(true);
+ } else {
+ isAuthorizedFuture = CompletableFuture.completedFuture(false);
+ }
+
+ return isAuthorizedFuture;
+ }
+
+ @Override
+ public Boolean allowTopicOperation(
+ TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ try {
+ return allowTopicOperationAsync(topicName, role, operation, authData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RestException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role,
+ PolicyName policy, PolicyOperation operation,
+ AuthenticationDataSource authData) {
+ CompletableFuture<Boolean> isAuthorizedFuture;
+
+ if (role.equals(superUserRole) || role.equals(clientRole)) {
+ isAuthorizedFuture = CompletableFuture.completedFuture(true);
+ } else {
+ isAuthorizedFuture = CompletableFuture.completedFuture(false);
+ }
+
+ return isAuthorizedFuture;
+ }
+
+ @Override
+ public Boolean allowTopicPolicyOperation(TopicName topicName, String role, PolicyName policy,
+ PolicyOperation operation, AuthenticationDataSource authData) {
+ try {
+ return allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RestException(e);
+ }
+ }
+ }
+
+ public static class ClientAuthentication implements Authentication {
+ String user;
+
+ public ClientAuthentication(String user) {
+ this.user = user;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "test";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ AuthenticationDataProvider provider = new AuthenticationDataProvider() {
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Set<Map.Entry<String, String>> getHttpHeaders() {
+ return Sets.newHashSet(Maps.immutableEntry("user", user));
+ }
+
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ public String getCommandData() {
+ return user;
+ }
+ };
+ return provider;
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ // No-op
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // No-op
+ }
+
+ }
+
+ public static class TestAuthenticationProvider implements AuthenticationProvider {
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+ // No-op
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "test";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+ return authData.getCommandData() != null ? authData.getCommandData() : authData.getHttpHeader("user");
+ }
+ }
+}