You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/08/05 04:04:19 UTC
[pulsar] branch branch-2.7 updated: [improve][test] Verify the authentication data in the authorization provider (#16945)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 97a91b4988f [improve][test] Verify the authentication data in the authorization provider (#16945)
97a91b4988f is described below
commit 97a91b4988ff77f93f9f74abd0bae00d960713b4
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Fri Aug 5 12:04:13 2022 +0800
[improve][test] Verify the authentication data in the authorization provider (#16945)
Signed-off-by: Zixuan Liu <no...@gmail.com>
---
.../broker/auth/AuthorizationWithAuthDataTest.java | 291 +++++++++++++++++++++
.../broker/auth/MockedPulsarServiceBaseTest.java | 36 ++-
2 files changed, 326 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
new file mode 100644
index 00000000000..0f90aff4597
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+@Test(groups = "broker")
+public class AuthorizationWithAuthDataTest extends MockedPulsarServiceBaseTest {
+
+ private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ private static final String ADMIN_ROLE = "admin";
+ private static final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact();
+
+ public static class MyAuthorizationProvider implements AuthorizationProvider {
+
+ public MyAuthorizationProvider() {
+ }
+
+ private void assertRoleAndAuthenticationData(String role, AuthenticationDataSource authenticationData) {
+ assertEquals(role, ADMIN_ROLE);
+ if (authenticationData.hasDataFromHttp()) {
+ String authorization = authenticationData.getHttpHeader("Authorization");
+ assertEquals(authorization, "Bearer " + ADMIN_TOKEN);
+ } else {
+ assertEquals(authenticationData.getCommandData(), ADMIN_TOKEN);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isSuperUser(String role, AuthenticationDataSource authenticationData,
+ ServiceConfiguration serviceConfiguration) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
+ AuthenticationDataSource authenticationData) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ // noop
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData,
+ String subscription) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ assertRoleAndAuthenticationData(role, authenticationData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+ String role, String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, Set<String> roles,
+ String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, String role,
+ String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
+ String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ assertRoleAndAuthenticationData(role, authData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ assertRoleAndAuthenticationData(role, authData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation, String role,
+ AuthenticationDataSource authData) {
+ assertRoleAndAuthenticationData(role, authData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ assertRoleAndAuthenticationData(role, authData);
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setAuthenticationEnabled(true);
+ conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters(ADMIN_TOKEN);
+
+ conf.setAuthorizationEnabled(true);
+ conf.setAuthorizationProvider(MyAuthorizationProvider.class.getName());
+ }
+
+ @SneakyThrows
+ @Override
+ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+ super.customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
+ pulsarAdminBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN);
+ }
+
+ @SneakyThrows
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+ super.customizeNewPulsarClientBuilder(clientBuilder);
+ clientBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN);
+ }
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Test
+ public void testAdmin() throws PulsarAdminException {
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Sets.newHashSet(configClusterName));
+ admin.tenants().createTenant("test-tenant-1", tenantInfo);
+ admin.namespaces().createNamespace("test-tenant-1/test-namespace-1");
+ String partitionedTopic = UUID.randomUUID().toString();
+ admin.topics().createPartitionedTopic(partitionedTopic,3);
+ String nonPartitionedTopic = UUID.randomUUID().toString();
+ admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+ admin.lookups().lookupPartitionedTopic(partitionedTopic);
+ admin.lookups().lookupTopic(nonPartitionedTopic);
+ }
+
+ @Test
+ public void testClient() throws PulsarClientException {
+ String topic = UUID.randomUUID().toString();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+ byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
+ producer.send(msg);
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test").subscribe();
+ Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getData(), msg);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ed2753db763..b0e769ded9c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -122,8 +123,15 @@ public abstract class MockedPulsarServiceBaseTest {
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}
+ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+
+ }
+
protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
- return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+ ClientBuilder clientBuilder =
+ PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS);
+ customizeNewPulsarClientBuilder(clientBuilder);
+ return clientBuilder.build();
}
protected final void internalSetupForStatsTest() throws Exception {
@@ -256,9 +264,14 @@ public abstract class MockedPulsarServiceBaseTest {
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString()
: brokerUrlTls.toString());
+ customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
admin = spy(pulsarAdminBuilder.build());
}
+ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+
+ }
+
protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
@@ -423,5 +436,26 @@ public abstract class MockedPulsarServiceBaseTest {
return configuration;
}
+ protected void setupDefaultTenantAndNamespace() throws Exception {
+ final String tenant = "public";
+ final String namespace = tenant + "/default";
+
+ if (!admin.clusters().getClusters().contains(configClusterName)) {
+ ClusterData clusterData = new ClusterData();
+ clusterData.setServiceUrl(pulsar.getWebServiceAddress());
+ admin.clusters().createCluster(configClusterName, clusterData);
+ }
+
+ if (!admin.tenants().getTenants().contains(tenant)) {
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Sets.newHashSet(configClusterName));
+ admin.tenants().createTenant(tenant, tenantInfo);
+ }
+
+ if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
+ admin.namespaces().createNamespace(namespace);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}