You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/10 22:16:12 UTC
[pulsar] branch branch-2.7 updated: PLSR-1470: Only auth _errors_ should log at error level (#9325)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 8e97d58 PLSR-1470: Only auth _errors_ should log at error level (#9325)
8e97d58 is described below
commit 8e97d58b47293f84877e10860b50b6be2b2078d0
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Feb 15 06:07:55 2021 +0100
PLSR-1470: Only auth _errors_ should log at error level (#9325)
An authentication failure should not log at error level, as it depends
on the credentials that the client provides, which may well be wrong.
An authentication/authorization error, where it cannot be decided if
credentials are valid or not, is an error that needs operator
attention and therefore should be logged at error level.
Co-authored-by: Ivan Kelly <ik...@splunk.com>
Co-authored-by: Matteo Merli <mm...@splunk.com>
Co-authored-by: Sijie Guo <si...@apache.org>
(cherry picked from commit fd9875c41bdb86afad07413fdeedcc62781e92d2)
---
.../authentication/AuthenticationProviderList.java | 2 +-
.../pulsar/broker/web/AuthenticationFilter.java | 8 +-
.../apache/pulsar/broker/service/ServerCnx.java | 37 +--
.../apache/pulsar/broker/auth/AuthLogsTest.java | 222 ++++++++++++++++
.../pulsar/broker/auth/MockAuthentication.java | 87 +++++++
.../broker/auth/MockAuthenticationProvider.java | 70 +++++
.../broker/auth/MockAuthorizationProvider.java | 289 +++++++++++++++++++++
7 files changed, 696 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
index 2d55f51..a79fabe 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
@@ -179,7 +179,7 @@ public class AuthenticationProviderList implements AuthenticationProvider {
authenticationException = ae;
}
if (states.isEmpty()) {
- log.error("Failed to initialize a new auth state from {}", remoteAddress, authenticationException);
+ log.debug("Failed to initialize a new auth state from {}", remoteAddress, authenticationException);
if (authenticationException != null) {
throw authenticationException;
} else {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index 8555f34..5d3cae4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.web;
import java.io.IOException;
+import javax.naming.AuthenticationException;
+
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -93,7 +95,11 @@ public class AuthenticationFilter implements Filter {
} catch (Exception e) {
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
- LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
+ if (e instanceof AuthenticationException) {
+ LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
+ } else {
+ LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e);
+ }
return;
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 697d4e5..d24bc4f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
@@ -61,7 +62,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -400,8 +400,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
return null;
}).exceptionally(ex -> {
+ logAuthException(remoteAddress, "lookup", getPrincipal(), Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to authorize lookup";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName, ex);
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
return null;
@@ -472,9 +472,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
return null;
}).exceptionally(ex -> {
+ logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to authorize get Partition Metadata";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
+ requestId));
lookupSemaphore.release();
return null;
});
@@ -770,12 +771,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
}
} catch (Exception e) {
+ logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
String msg = "Unable to authenticate";
- if (e instanceof AuthenticationException) {
- log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
- } else {
- log.warn("[{}] {}", remoteAddress, msg, e);
- }
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
close();
}
@@ -995,12 +992,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
return null;
}).exceptionally(ex -> {
- String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
- if (ex.getCause() instanceof PulsarServerException) {
- log.info(msg);
- } else {
- log.warn(msg);
- }
+ logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
@@ -1206,8 +1198,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
return null;
}).exceptionally(ex -> {
- String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
- log.warn(msg);
+ logAuthException(remoteAddress, "producer", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
@@ -2267,4 +2258,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
public void execute(Runnable runnable) {
ctx.channel().eventLoop().execute(runnable);
}
+
+ private static void logAuthException(SocketAddress remoteAddress, String operation,
+ String principal, Optional<TopicName> topic, Throwable ex) {
+ String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
+ if (ex instanceof AuthenticationException) {
+ log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
+ remoteAddress, operation, principal, topicString, ex.getMessage());
+ } else {
+ log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
+ remoteAddress, operation, principal, topicString, ex);
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
new file mode 100644
index 0000000..d0323a8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import static org.testng.Assert.fail;
+
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterClass;
+
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException;
+import org.apache.pulsar.client.api.Producer;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test doesn't test much in and off itself.
+ * However it is useful to see which logs are produced when there's an
+ * failure or error in authentication.
+ */
+public class AuthLogsTest extends MockedPulsarServiceBaseTest {
+ private static final Logger log = LoggerFactory.getLogger(AuthLogsTest.class);
+
+ public AuthLogsTest() {
+ super();
+ }
+
+ @BeforeClass
+ @Override
+ public void setup() throws Exception {
+ conf.setClusterName("test");
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthenticationProviders(
+ Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider"));
+ conf.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+ conf.setAuthorizationEnabled(true);
+ conf.setAuthorizationAllowWildcardsMatching(true);
+ conf.setSuperUserRoles(Sets.newHashSet("super"));
+ internalSetup();
+
+ try (PulsarAdmin admin = PulsarAdmin.builder()
+ .authentication(new MockAuthentication("pass.pass"))
+ .serviceHttpUrl(brokerUrl.toString()).build()) {
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+ admin.tenants().createTenant("public",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+ }
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Test
+ public void binaryEndpoint() throws Exception {
+ log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("pass.pass"))
+ .build();
+ Producer<byte[]> producer = client.newProducer().topic("foobar").create();
+ Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+ .subscriptionName("foobar").subscribe()) {
+ }
+
+ log.info("LOG_TEST_PRODUCER_AUTHN_FAIL");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("fail.ignored"))
+ .build();
+ Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+ fail("Should fail auth");
+ } catch (AuthenticationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_PRODUCER_AUTHN_ERROR");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("error.ignored"))
+ .build();
+ Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+ fail("Should fail auth");
+ } catch (AuthenticationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_CONSUMER_AUTHN_FAIL");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("fail.ignored"))
+ .build();
+ Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+ .subscriptionName("foobar").subscribe()) {
+ fail("Should fail auth");
+ } catch (AuthenticationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_CONSUMER_AUTHN_ERROR");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("error.ignored"))
+ .build();
+ Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+ .subscriptionName("foobar").subscribe()) {
+ fail("Should fail auth");
+ } catch (AuthenticationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_PRODUCER_AUTHZ_FAIL");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("pass.fail"))
+ .build();
+ Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+ fail("Should fail auth");
+ } catch (AuthorizationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_PRODUCER_AUTHZ_ERROR");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("pass.error"))
+ .build();
+ Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
+ fail("Should fail auth");
+ } catch (AuthorizationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_CONSUMER_AUTHZ_FAIL");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("pass.fail"))
+ .build();
+ Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+ .subscriptionName("foobar").subscribe()) {
+ fail("Should fail auth");
+ } catch (AuthorizationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_CONSUMER_AUTHZ_ERROR");
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new MockAuthentication("pass.error"))
+ .build();
+ Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
+ .subscriptionName("foobar").subscribe()) {
+ fail("Should fail auth");
+ } catch (AuthorizationException ae) { /* expected */ }
+
+ log.info("LOG_TEST_CLIENT_DONE");
+ }
+
+ @Test
+ public void httpEndpoint() throws Exception {
+ log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client");
+ try (PulsarAdmin admin = PulsarAdmin.builder()
+ .authentication(new MockAuthentication("pass.pass"))
+ .serviceHttpUrl(brokerUrl.toString()).build()) {
+ admin.namespaces().getNamespaces("public");
+ }
+
+ log.info("LOG_TEST_HTTP_AUTHN_FAIL");
+ try (PulsarAdmin admin = PulsarAdmin.builder()
+ .authentication(new MockAuthentication("fail.ignore"))
+ .serviceHttpUrl(brokerUrl.toString()).build()) {
+ admin.namespaces().getNamespaces("public");
+ fail("Should fail auth");
+ } catch (NotAuthorizedException ae) { /* expected */ }
+
+ log.info("LOG_TEST_HTTP_AUTHN_ERROR");
+ try (PulsarAdmin admin = PulsarAdmin.builder()
+ .authentication(new MockAuthentication("error.ignore"))
+ .serviceHttpUrl(brokerUrl.toString()).build()) {
+ admin.namespaces().getNamespaces("public");
+ fail("Should fail auth");
+ } catch (NotAuthorizedException ae) { /* expected */ }
+
+
+ log.info("LOG_TEST_HTTP_AUTHZ_FAIL");
+ try (PulsarAdmin admin = PulsarAdmin.builder()
+ .authentication(new MockAuthentication("pass.fail"))
+ .serviceHttpUrl(brokerUrl.toString()).build()) {
+ admin.namespaces().getNamespaces("public");
+ fail("Should fail auth");
+ } catch (NotAuthorizedException ae) { /* expected */ }
+
+ log.info("LOG_TEST_HTTP_AUTHZ_ERROR");
+ try (PulsarAdmin admin = PulsarAdmin.builder()
+ .authentication(new MockAuthentication("pass.error"))
+ .serviceHttpUrl(brokerUrl.toString()).build()) {
+ admin.namespaces().getNamespaces("public");
+ fail("Should fail auth");
+ } catch (ServerSideErrorException ae) { /* expected */ }
+
+
+ log.info("LOG_TEST_CLIENT_DONE");
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
new file mode 100644
index 0000000..a63c849
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockAuthentication implements Authentication {
+ private final static Logger log = LoggerFactory.getLogger(MockAuthentication.class);
+ private final String user;
+
+ public MockAuthentication(String user) {
+ this.user = user;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String getAuthMethodName() {
+ return "mock";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ return new AuthenticationDataProvider() {
+ @Override
+ public boolean hasDataForHttp() { return true; }
+ @Override
+ public String getHttpAuthType() { return "mock"; }
+ @Override
+ public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
+ return ImmutableMap.of("mockuser", user).entrySet();
+ }
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+ @Override
+ public String getCommandData() {
+ return user;
+ }
+ };
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ }
+
+ @Override
+ public void start() throws PulsarClientException {}
+
+
+ @Override
+ public void authenticationStage(String requestUrl,
+ AuthenticationDataProvider authData,
+ Map<String, String> previousResHeaders,
+ CompletableFuture<Map<String, String>> authFuture) {
+ authFuture.complete(null);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java
new file mode 100644
index 0000000..bd6a61f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import java.io.IOException;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockAuthenticationProvider implements AuthenticationProvider {
+ private static Logger log = LoggerFactory.getLogger(MockAuthenticationProvider.class);
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {}
+
+ @Override
+ public String getAuthMethodName() {
+ // method name
+ return "mock";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+ String principal = "unknown";
+ if (authData.hasDataFromHttp()) {
+ principal = authData.getHttpHeader("mockuser");
+ } else if (authData.hasDataFromCommand()) {
+ principal = authData.getCommandData();
+ }
+
+ String[] parts = principal.split("\\.");
+ if (parts.length == 2) {
+ if (parts[0].equals("pass")) {
+ return principal;
+ } else if (parts[0].equals("fail")) {
+ throw new AuthenticationException("Do not pass");
+ } else if (parts[0].equals("error")) {
+ throw new RuntimeException("Error in authn");
+ }
+ }
+ throw new IllegalArgumentException(
+ "Not a valid principle. Should be [pass|fail|error].[pass|fail|error], found " + principal);
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
new file mode 100644
index 0000000..d219f41
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockAuthorizationProvider implements AuthorizationProvider {
+ private static final Logger log = LoggerFactory.getLogger(MockAuthorizationProvider.class);
+
+ @Override
+ public void close() {}
+
+ @Override
+ public CompletableFuture<Boolean> isSuperUser(String role,
+ AuthenticationDataSource authenticationData,
+ ServiceConfiguration serviceConfiguration) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
+ AuthenticationDataSource authenticationData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData,
+ String subscription) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+ String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, Set<String> roles,
+ String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+ String role, String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
+ String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowNamespaceOperation(NamespaceName namespaceName,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+ String originalRole,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowNamespaceOperation(NamespaceName namespaceName,
+ String originalRole,
+ String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String role,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String role,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowTopicOperation(TopicName topicName,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+ String originalRole,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorizedAsync(role);
+ }
+
+ @Override
+ public Boolean allowTopicOperation(TopicName topicName,
+ String originalRole,
+ String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return roleAuthorized(role);
+ }
+
+ CompletableFuture<Boolean> roleAuthorizedAsync(String role) {
+ CompletableFuture<Boolean> promise = new CompletableFuture<>();
+ try {
+ promise.complete(roleAuthorized(role));
+ } catch (Exception e) {
+ promise.completeExceptionally(e);
+ }
+ return promise;
+ }
+
+ boolean roleAuthorized(String role) {
+ String[] parts = role.split("\\.");
+ if (parts.length == 2) {
+ if (parts[1].equals("pass")) {
+ return true;
+ } else if (parts[1].equals("fail")) {
+ return false;
+ } else if (parts[1].equals("error")) {
+ throw new RuntimeException("Error in authn");
+ }
+ }
+ throw new IllegalArgumentException(
+ "Not a valid principle. Should be [pass|fail|error].[pass|fail|error], found " + role);
+ }
+}