You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/10 22:16:12 UTC

[pulsar] branch branch-2.7 updated: PLSR-1470: Only auth _errors_ should log at error level (#9325)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 8e97d58  PLSR-1470: Only auth _errors_ should log at error level (#9325)
8e97d58 is described below

commit 8e97d58b47293f84877e10860b50b6be2b2078d0
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Feb 15 06:07:55 2021 +0100

    PLSR-1470: Only auth _errors_ should log at error level (#9325)
    
    An authentication failure should not log at error level, as it depends
    on the credentials that the client provides, which may well be wrong.
    
    An authentication/authorization error, where it cannot be decided if
    credentials are valid or not, is an error that needs operator
    attention and therefore should be logged at error level.
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
    Co-authored-by: Matteo Merli <mm...@splunk.com>
    Co-authored-by: Sijie Guo <si...@apache.org>
    (cherry picked from commit fd9875c41bdb86afad07413fdeedcc62781e92d2)
---
 .../authentication/AuthenticationProviderList.java |   2 +-
 .../pulsar/broker/web/AuthenticationFilter.java    |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  37 +--
 .../apache/pulsar/broker/auth/AuthLogsTest.java    | 222 ++++++++++++++++
 .../pulsar/broker/auth/MockAuthentication.java     |  87 +++++++
 .../broker/auth/MockAuthenticationProvider.java    |  70 +++++
 .../broker/auth/MockAuthorizationProvider.java     | 289 +++++++++++++++++++++
 7 files changed, 696 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
index 2d55f51..a79fabe 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
@@ -179,7 +179,7 @@ public class AuthenticationProviderList implements AuthenticationProvider {
             authenticationException = ae;
         }
         if (states.isEmpty()) {
-            log.error("Failed to initialize a new auth state from {}", remoteAddress, authenticationException);
+            log.debug("Failed to initialize a new auth state from {}", remoteAddress, authenticationException);
             if (authenticationException != null) {
                 throw authenticationException;
             } else {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index 8555f34..5d3cae4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.web;
 
 import java.io.IOException;
 
+import javax.naming.AuthenticationException;
+
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
@@ -93,7 +95,11 @@ public class AuthenticationFilter implements Filter {
         } catch (Exception e) {
             HttpServletResponse httpResponse = (HttpServletResponse) response;
             httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
-            LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
+            if (e instanceof AuthenticationException) {
+                LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
+            } else {
+                LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e);
+            }
             return;
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 697d4e5..d24bc4f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import java.net.SocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -61,7 +62,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -400,8 +400,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 }
                 return null;
             }).exceptionally(ex -> {
+                logAuthException(remoteAddress, "lookup", getPrincipal(), Optional.of(topicName), ex);
                 final String msg = "Exception occurred while trying to authorize lookup";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName, ex);
                 ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
                 lookupSemaphore.release();
                 return null;
@@ -472,9 +472,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 }
                 return null;
             }).exceptionally(ex -> {
+                logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex);
                 final String msg = "Exception occurred while trying to authorize get Partition Metadata";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
-                ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
+                ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
+                        requestId));
                 lookupSemaphore.release();
                 return null;
             });
@@ -770,12 +771,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 }
             }
         } catch (Exception e) {
+            logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
             String msg = "Unable to authenticate";
-            if (e instanceof AuthenticationException) {
-                log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
-            } else {
-                log.warn("[{}] {}", remoteAddress, msg, e);
-            }
             ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
             close();
         }
@@ -995,12 +992,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     }
                     return null;
         }).exceptionally(ex -> {
-            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
-            if (ex.getCause() instanceof PulsarServerException) {
-                log.info(msg);
-            } else {
-                log.warn(msg);
-            }
+            logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
             commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
             return null;
         });
@@ -1206,8 +1198,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     }
                     return null;
         }).exceptionally(ex -> {
-            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
-            log.warn(msg);
+            logAuthException(remoteAddress, "producer", getPrincipal(), Optional.of(topicName), ex);
             commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
             return null;
         });
@@ -2267,4 +2258,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     public void execute(Runnable runnable) {
         ctx.channel().eventLoop().execute(runnable);
     }
+
+    private static void logAuthException(SocketAddress remoteAddress, String operation,
+                                         String principal, Optional<TopicName> topic, Throwable ex) {
+        String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
+        if (ex instanceof AuthenticationException) {
+            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
+                     remoteAddress, operation, principal, topicString, ex.getMessage());
+        } else {
+            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
+                      remoteAddress, operation, principal, topicString, ex);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
new file mode 100644
index 0000000..d0323a8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import static org.testng.Assert.fail;
+
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterClass;
+
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException;
+import org.apache.pulsar.client.api.Producer;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test doesn't test much in and off itself.
+ * However it is useful to see which logs are produced when there's an
+ * failure or error in authentication.
+ */
+public class AuthLogsTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = LoggerFactory.getLogger(AuthLogsTest.class);
+
+    public AuthLogsTest() {
+        super();
+    }
+
+    @BeforeClass
+    @Override
+    public void setup() throws Exception {
+        conf.setClusterName("test");
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(
+                Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider"));
+        conf.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthorizationAllowWildcardsMatching(true);
+        conf.setSuperUserRoles(Sets.newHashSet("super"));
+        internalSetup();
+
+        try (PulsarAdmin admin = PulsarAdmin.builder()
+             .authentication(new MockAuthentication("pass.pass"))
+             .serviceHttpUrl(brokerUrl.toString()).build()) {
+            admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+            admin.tenants().createTenant("public",
+                                         new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+            admin.namespaces().createNamespace("public/default");
+            admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+        }
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test
+    public void binaryEndpoint() throws Exception {
+        log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("pass.pass"))
+                .build();
+             Producer<byte[]> producer = client.newProducer().topic("foobar").create();
+             Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+                .subscriptionName("foobar").subscribe()) {
+        }
+
+        log.info("LOG_TEST_PRODUCER_AUTHN_FAIL");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("fail.ignored"))
+                .build();
+             Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+            fail("Should fail auth");
+        } catch (AuthenticationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_PRODUCER_AUTHN_ERROR");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("error.ignored"))
+                .build();
+             Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+            fail("Should fail auth");
+        } catch (AuthenticationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_CONSUMER_AUTHN_FAIL");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("fail.ignored"))
+                .build();
+             Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+                .subscriptionName("foobar").subscribe()) {
+            fail("Should fail auth");
+        } catch (AuthenticationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_CONSUMER_AUTHN_ERROR");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("error.ignored"))
+                .build();
+             Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+                .subscriptionName("foobar").subscribe()) {
+            fail("Should fail auth");
+        } catch (AuthenticationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_PRODUCER_AUTHZ_FAIL");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("pass.fail"))
+                .build();
+             Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+            fail("Should fail auth");
+        } catch (AuthorizationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_PRODUCER_AUTHZ_ERROR");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("pass.error"))
+                .build();
+             Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+            fail("Should fail auth");
+        } catch (AuthorizationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_CONSUMER_AUTHZ_FAIL");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("pass.fail"))
+                .build();
+             Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+                .subscriptionName("foobar").subscribe()) {
+            fail("Should fail auth");
+        } catch (AuthorizationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_CONSUMER_AUTHZ_ERROR");
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new MockAuthentication("pass.error"))
+                .build();
+             Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+                .subscriptionName("foobar").subscribe()) {
+            fail("Should fail auth");
+        } catch (AuthorizationException ae) { /* expected */ }
+
+        log.info("LOG_TEST_CLIENT_DONE");
+    }
+
+    @Test
+    public void httpEndpoint() throws Exception {
+        log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client");
+        try (PulsarAdmin admin = PulsarAdmin.builder()
+                .authentication(new MockAuthentication("pass.pass"))
+                .serviceHttpUrl(brokerUrl.toString()).build()) {
+            admin.namespaces().getNamespaces("public");
+        }
+
+        log.info("LOG_TEST_HTTP_AUTHN_FAIL");
+        try (PulsarAdmin admin = PulsarAdmin.builder()
+                .authentication(new MockAuthentication("fail.ignore"))
+                .serviceHttpUrl(brokerUrl.toString()).build()) {
+            admin.namespaces().getNamespaces("public");
+            fail("Should fail auth");
+        } catch (NotAuthorizedException ae) { /* expected */ }
+
+        log.info("LOG_TEST_HTTP_AUTHN_ERROR");
+        try (PulsarAdmin admin = PulsarAdmin.builder()
+                .authentication(new MockAuthentication("error.ignore"))
+                .serviceHttpUrl(brokerUrl.toString()).build()) {
+            admin.namespaces().getNamespaces("public");
+            fail("Should fail auth");
+        } catch (NotAuthorizedException ae) { /* expected */ }
+
+
+        log.info("LOG_TEST_HTTP_AUTHZ_FAIL");
+        try (PulsarAdmin admin = PulsarAdmin.builder()
+                .authentication(new MockAuthentication("pass.fail"))
+                .serviceHttpUrl(brokerUrl.toString()).build()) {
+            admin.namespaces().getNamespaces("public");
+            fail("Should fail auth");
+        } catch (NotAuthorizedException ae) { /* expected */ }
+
+        log.info("LOG_TEST_HTTP_AUTHZ_ERROR");
+        try (PulsarAdmin admin = PulsarAdmin.builder()
+                .authentication(new MockAuthentication("pass.error"))
+                .serviceHttpUrl(brokerUrl.toString()).build()) {
+            admin.namespaces().getNamespaces("public");
+            fail("Should fail auth");
+        } catch (ServerSideErrorException ae) { /* expected */ }
+
+
+        log.info("LOG_TEST_CLIENT_DONE");
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
new file mode 100644
index 0000000..a63c849
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockAuthentication implements Authentication {
+    private final static Logger log = LoggerFactory.getLogger(MockAuthentication.class);
+    private final String user;
+
+    public MockAuthentication(String user) {
+        this.user = user;
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public String getAuthMethodName() {
+        return "mock";
+    }
+
+    @Override
+    public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+        return new AuthenticationDataProvider() {
+            @Override
+            public boolean hasDataForHttp() { return true; }
+            @Override
+            public String getHttpAuthType() { return "mock"; }
+            @Override
+            public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
+                return ImmutableMap.of("mockuser", user).entrySet();
+            }
+            @Override
+            public boolean hasDataFromCommand() {
+                return true;
+            }
+            @Override
+            public String getCommandData() {
+                return user;
+            }
+        };
+    }
+
+    @Override
+    public void configure(Map<String, String> authParams) {
+    }
+
+    @Override
+    public void start() throws PulsarClientException {}
+
+
+    @Override
+    public void authenticationStage(String requestUrl,
+                                     AuthenticationDataProvider authData,
+                                     Map<String, String> previousResHeaders,
+                                     CompletableFuture<Map<String, String>> authFuture) {
+        authFuture.complete(null);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java
new file mode 100644
index 0000000..bd6a61f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import java.io.IOException;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockAuthenticationProvider implements AuthenticationProvider {
+    private static Logger log = LoggerFactory.getLogger(MockAuthenticationProvider.class);
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {}
+
+    @Override
+    public String getAuthMethodName() {
+        // method name
+        return "mock";
+    }
+
+    @Override
+    public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+        String principal = "unknown";
+        if (authData.hasDataFromHttp()) {
+            principal = authData.getHttpHeader("mockuser");
+        } else if (authData.hasDataFromCommand()) {
+            principal = authData.getCommandData();
+        }
+
+        String[] parts = principal.split("\\.");
+        if (parts.length == 2) {
+            if (parts[0].equals("pass")) {
+                return principal;
+            } else if (parts[0].equals("fail")) {
+                throw new AuthenticationException("Do not pass");
+            } else if (parts[0].equals("error")) {
+                throw new RuntimeException("Error in authn");
+            }
+        }
+        throw new IllegalArgumentException(
+                "Not a valid principle. Should be [pass|fail|error].[pass|fail|error], found " + principal);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
new file mode 100644
index 0000000..d219f41
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockAuthorizationProvider implements AuthorizationProvider {
+    private static final Logger log = LoggerFactory.getLogger(MockAuthorizationProvider.class);
+
+    @Override
+    public void close() {}
+
+    @Override
+    public CompletableFuture<Boolean> isSuperUser(String role,
+                                                  AuthenticationDataSource authenticationData,
+                                                  ServiceConfiguration serviceConfiguration) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
+                                                    AuthenticationDataSource authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+                                                      AuthenticationDataSource authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+                                                      AuthenticationDataSource authenticationData,
+                                                      String subscription) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+            AuthenticationDataSource authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+                                                            AuthenticationDataSource authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
+                                                          AuthenticationDataSource authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
+                                                        AuthenticationDataSource authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+                                                        String authDataJson) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
+                                                                    String subscriptionName, Set<String> roles,
+                                                                    String authDataJson) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+                                                                     String role, String authDataJson) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
+                                                        String authDataJson) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
+                                                                TenantOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
+                                        AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role,
+                                                                TenantOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation,
+                                        AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   String role,
+                                                                   NamespaceOperation operation,
+                                                                   AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                           String role,
+                                           NamespaceOperation operation,
+                                           AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   String originalRole,
+                                                                   String role,
+                                                                   NamespaceOperation operation,
+                                                                   AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                           String originalRole,
+                                           String role,
+                                           NamespaceOperation operation,
+                                           AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         PolicyName policy,
+                                                                         PolicyOperation operation,
+                                                                         String role,
+                                                                         AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                 PolicyName policy,
+                                                 PolicyOperation operation,
+                                                 String role,
+                                                 AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         PolicyName policy,
+                                                                         PolicyOperation operation,
+                                                                         String originalRole,
+                                                                         String role,
+                                                                         AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                 PolicyName policy,
+                                                 PolicyOperation operation,
+                                                 String originalRole,
+                                                 String role,
+                                                 AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+                                                                String role,
+                                                                TopicOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowTopicOperation(TopicName topicName,
+                                        String role,
+                                        TopicOperation operation,
+                                        AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+                                                                String originalRole,
+                                                                String role,
+                                                                TopicOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
+    public Boolean allowTopicOperation(TopicName topicName,
+                                       String originalRole,
+                                       String role,
+                                       TopicOperation operation,
+                                       AuthenticationDataSource authData) {
+        return roleAuthorized(role);
+    }
+
+    CompletableFuture<Boolean> roleAuthorizedAsync(String role) {
+        CompletableFuture<Boolean> promise = new CompletableFuture<>();
+        try {
+            promise.complete(roleAuthorized(role));
+        } catch (Exception e) {
+            promise.completeExceptionally(e);
+        }
+        return promise;
+    }
+
+    boolean roleAuthorized(String role) {
+        String[] parts = role.split("\\.");
+        if (parts.length == 2) {
+            if (parts[1].equals("pass")) {
+                return true;
+            } else if (parts[1].equals("fail")) {
+                return false;
+            } else if (parts[1].equals("error")) {
+                throw new RuntimeException("Error in authn");
+            }
+        }
+        throw new IllegalArgumentException(
+                "Not a valid principle. Should be [pass|fail|error].[pass|fail|error], found " + role);
+    }
+}