You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/10 22:42:36 UTC

[pulsar] branch branch-2.7 updated: [broker] fix `GetTopicsOfNamespace` with binary lookup service not check auth (#11172)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new deb9b0b  [broker] fix `GetTopicsOfNamespace` with binary lookup service not check auth (#11172)
deb9b0b is described below

commit deb9b0be7e8594e0358d1eb6a558f38c578f99ae
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Aug 6 14:32:38 2021 +0800

    [broker] fix `GetTopicsOfNamespace` with binary lookup service not check auth (#11172)
    
    (cherry picked from commit 64b44a5f6eb3ebdb8a3a8c44ddf81c82b12c05c6)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 114 ++++-
 .../impl/PatternTopicsConsumerImplAuthTest.java    | 462 +++++++++++++++++++++
 2 files changed, 559 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d24bc4f..0070c92 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -85,6 +85,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
 import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.CommandUtils;
@@ -1612,6 +1613,36 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         });
     }
 
+    private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
+                                                                   NamespaceOperation operation) {
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        CompletableFuture<Boolean> isAuthorizedFuture;
+        if (service.isAuthorizationEnabled()) {
+            if (originalPrincipal != null) {
+                isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
+                        namespaceName, operation, originalPrincipal, getAuthenticationData());
+            } else {
+                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            }
+            isAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
+                    namespaceName, operation, authRole, authenticationData);
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            isAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
+            if (!isProxyAuthorized) {
+                log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}",
+                        originalPrincipal, operation, namespaceName);
+            }
+            if (!isAuthorized) {
+                log.warn("Role {} is not authorized to perform operation {} on namespace {}",
+                        authRole, operation, namespaceName);
+            }
+            return isProxyAuthorized && isAuthorized;
+        });
+    }
+
     @Override
     protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
         final long requestId = commandGetTopicsOfNamespace.getRequestId();
@@ -1619,23 +1650,60 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
         final NamespaceName namespaceName = NamespaceName.get(namespace);
 
-        getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
-                .thenAccept(topics -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
-                                remoteAddress, namespace, requestId, topics.size());
-                    }
-                    commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
-                })
-                .exceptionally(ex -> {
-                    log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
-                            remoteAddress, namespace, requestId);
-                    commandSender.sendErrorResponse(requestId,
-                            BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
-                            ex.getMessage());
-
-                    return null;
-                });
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+                        .thenAccept(topics -> {
+                            if (log.isDebugEnabled()) {
+                                log.debug(
+                                        "[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
+                                        remoteAddress, namespace, requestId, topics.size());
+                            }
+                            commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
+                            lookupSemaphore.release();
+                        })
+                        .exceptionally(ex -> {
+                            log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
+                                    remoteAddress, namespace, requestId);
+                            commandSender.sendErrorResponse(requestId,
+                                    BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
+                                    ex.getMessage());
+                            lookupSemaphore.release();
+                            return null;
+                        });
+                } else {
+                    final String msg = "Proxy Client is not authorized to GetTopicsOfNamespace";
+                    log.warn("[{}] {} with role {} on namespace {}", remoteAddress, msg, getPrincipal(), namespaceName);
+                    commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+                    lookupSemaphore.release();
+                }
+                return null;
+            }).exceptionally(ex -> {
+                logNamespaceNameAuthException(remoteAddress, "GetTopicsOfNamespace", getPrincipal(),
+                        Optional.of(namespaceName), ex);
+                final String msg = "Exception occurred while trying to authorize GetTopicsOfNamespace";
+                commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return null;
+            });
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed GetTopicsOfNamespace lookup due to too many lookup-requests {}", remoteAddress,
+                        namespaceName);
+            }
+            commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
+                    "Failed due to too many pending lookup requests");
+        }
     }
 
     @Override
@@ -2270,4 +2338,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                       remoteAddress, operation, principal, topicString, ex);
         }
     }
+
+    private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation,
+                                         String principal, Optional<NamespaceName> namespaceName, Throwable ex) {
+        String namespaceNameString = namespaceName.map(t -> ", namespace=" + t.toString()).orElse("");
+        if (ex instanceof AuthenticationException) {
+            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
+                    remoteAddress, operation, principal, namespaceNameString, ex.getMessage());
+        } else {
+            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
+                    remoteAddress, operation, principal, namespaceNameString, ex);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
new file mode 100644
index 0000000..086f969
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+import javax.naming.AuthenticationException;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.RestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
+    private static final long testTimeout = 90000; // 1.5 min
+    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplAuthTest.class);
+    private static final String clientRole = "pluggableRole";
+    private static final String superUserRole = "superUser";
+    private static final Set<String> clientAuthProviderSupportedRoles = Sets.newHashSet(clientRole);
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        // set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
+        isTcpLookup = true;
+
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add(superUserRole);
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(TestAuthenticationProvider.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
+
+        conf.setClusterName("test");
+        super.internalSetup();
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private PulsarAdmin buildAdminClient() throws Exception {
+        Authentication adminAuthentication = new ClientAuthentication(superUserRole);
+        return PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl.toString())
+                .authentication(adminAuthentication)
+                .build();
+    }
+
+    // verify binary proto with correct auth check
+    // if with invalid role, consumer for pattern topic subscription will fail
+    @Test(timeOut = testTimeout)
+    public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
+        String key = "BinaryProtoToGetTopics";
+        String subscriptionName = "my-ex-subscription-" + key;
+        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
+        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
+        String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key;
+        Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");
+
+        @Cleanup
+        PulsarAdmin admin = buildAdminClient();
+
+        String lookupUrl = pulsar.getBrokerServiceUrl();
+
+        Authentication authentication = new ClientAuthentication(clientRole);
+        Authentication authenticationInvalidRole = new ClientAuthentication("test-role");
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
+
+        @Cleanup
+        PulsarClient pulsarClientInvalidRole = PulsarClient.builder().serviceUrl(lookupUrl)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS)
+                .authentication(authenticationInvalidRole).build();
+
+        // 1. create partition and grant permissions
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        admin.namespaces().grantPermissionOnNamespace("my-property/my-ns", clientRole,
+                EnumSet.allOf(AuthAction.class));
+
+        admin.topics().createPartitionedTopic(topicName2, 2);
+        admin.topics().createPartitionedTopic(topicName3, 3);
+
+        // 2. create producers and consumer
+        String messagePredicate = "my-message-" + key + "-";
+        int totalMessages = 30;
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2)
+                .enableBatching(false)
+                .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+                .create();
+        Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3)
+                .enableBatching(false)
+                .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+                .create();
+        Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topicName4)
+                .enableBatching(false)
+                .create();
+
+        Consumer<byte[]> consumer;
+        // Invalid user auth-role will be rejected by authorization service
+        try {
+            consumer = pulsarClientInvalidRole.newConsumer()
+                    .topicsPattern(pattern)
+                    .patternAutoDiscoveryPeriod(2)
+                    .subscriptionName(subscriptionName)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .receiverQueueSize(4)
+                    .subscribe();
+            Assert.fail("should have failed with authorization error");
+        } catch (PulsarClientException.AuthorizationException pa) {
+            // Ok
+        }
+
+        // create pattern topics consumer with correct role client
+        consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                .patternAutoDiscoveryPeriod(2)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(4)
+                .subscribe();
+        assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
+
+        // 4. verify consumer
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
+        List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
+
+        assertEquals(topics.size(), 6);
+        assertEquals(consumers.size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
+
+        topics.forEach(topic -> log.debug("topic: {}", topic));
+        consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
+
+        IntStream.range(0, topics.size()).forEach(index ->
+                assertEquals(consumers.get(index).getTopic(), topics.get(index)));
+
+        ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+
+        // 5. produce data
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+            producer4.send((messagePredicate + "producer4-" + i).getBytes());
+        }
+
+        // 6. should receive all the message
+        int messageSet = 0;
+        Message<byte[]> message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+        producer4.close();
+    }
+
+    public static class TestAuthorizationProvider implements AuthorizationProvider {
+        public ServiceConfiguration conf;
+
+        @Override
+        public void close() throws IOException {
+            // No-op
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+            this.conf = conf;
+            // No-op
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+                                                          AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+                                                          AuthenticationDataSource authenticationData, String subscription) {
+            return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+                                                         AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+                                                            String role, String authenticationData) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(TopicName topicname, Set<AuthAction> actions, String role,
+                                                            String authenticationData) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
+                                                                        String subscriptionName, Set<String> roles, String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace,
+                                                                         String subscriptionName, String role, String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTenantOperationAsync(
+                String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public Boolean allowTenantOperation(
+                String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
+            return true;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowNamespaceOperationAsync(
+                NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+            CompletableFuture<Boolean> isAuthorizedFuture;
+
+            if (role.equals(superUserRole) || role.equals(clientRole)) {
+                isAuthorizedFuture = CompletableFuture.completedFuture(true);
+            } else {
+                isAuthorizedFuture = CompletableFuture.completedFuture(false);
+            }
+
+            return isAuthorizedFuture;
+        }
+
+        @Override
+        public Boolean allowNamespaceOperation(
+                NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+            try {
+                return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RestException(e);
+            }
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTopicOperationAsync(
+                TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
+            CompletableFuture<Boolean> isAuthorizedFuture;
+
+            if (role.equals(superUserRole) || role.equals(clientRole)) {
+                isAuthorizedFuture = CompletableFuture.completedFuture(true);
+            } else {
+                isAuthorizedFuture = CompletableFuture.completedFuture(false);
+            }
+
+            return isAuthorizedFuture;
+        }
+
+        @Override
+        public Boolean allowTopicOperation(
+                TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
+            try {
+                return allowTopicOperationAsync(topicName, role, operation, authData).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RestException(e);
+            }
+        }
+    }
+
+    public static class ClientAuthentication implements Authentication {
+        String user;
+
+        public ClientAuthentication(String user) {
+            this.user = user;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No-op
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "test";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+            AuthenticationDataProvider provider = new AuthenticationDataProvider() {
+                public boolean hasDataForHttp() {
+                    return true;
+                }
+
+                @SuppressWarnings("unchecked")
+                public Set<Map.Entry<String, String>> getHttpHeaders() {
+                    return Sets.newHashSet(Maps.immutableEntry("user", user));
+                }
+
+                public boolean hasDataFromCommand() {
+                    return true;
+                }
+
+                public String getCommandData() {
+                    return user;
+                }
+            };
+            return provider;
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // No-op
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            // No-op
+        }
+
+    }
+
+    public static class TestAuthenticationProvider implements AuthenticationProvider {
+
+        @Override
+        public void close() throws IOException {
+            // no-op
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration config) throws IOException {
+            // No-op
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "test";
+        }
+
+        @Override
+        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+            return authData.getCommandData() != null ? authData.getCommandData() : authData.getHttpHeader("user");
+        }
+    }
+}