You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/08/05 04:04:19 UTC

[pulsar] branch branch-2.7 updated: [improve][test] Verify the authentication data in the authorization provider (#16945)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 97a91b4988f [improve][test] Verify the authentication data in the authorization provider (#16945)
97a91b4988f is described below

commit 97a91b4988ff77f93f9f74abd0bae00d960713b4
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Fri Aug 5 12:04:13 2022 +0800

    [improve][test] Verify the authentication data in the authorization provider (#16945)
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
---
 .../broker/auth/AuthorizationWithAuthDataTest.java | 291 +++++++++++++++++++++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  36 ++-
 2 files changed, 326 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
new file mode 100644
index 00000000000..0f90aff4597
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+@Test(groups = "broker")
+public class AuthorizationWithAuthDataTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_ROLE = "admin";
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact();
+
+    public static class MyAuthorizationProvider implements AuthorizationProvider {
+
+        public MyAuthorizationProvider() {
+        }
+
+        private void assertRoleAndAuthenticationData(String role, AuthenticationDataSource authenticationData) {
+            assertEquals(role, ADMIN_ROLE);
+            if (authenticationData.hasDataFromHttp()) {
+                String authorization = authenticationData.getHttpHeader("Authorization");
+                assertEquals(authorization, "Bearer " + ADMIN_TOKEN);
+            } else {
+                assertEquals(authenticationData.getCommandData(), ADMIN_TOKEN);
+            }
+        }
+
+        @Override
+        public CompletableFuture<Boolean> isSuperUser(String role, AuthenticationDataSource authenticationData,
+                                                      ServiceConfiguration serviceConfiguration) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
+                                                        AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+            // noop
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+                                                          AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+                                                          AuthenticationDataSource authenticationData,
+                                                          String subscription) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+                                                         AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+                                                                AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
+                                                              AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
+                                                            AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+                                                            String role, String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
+                                                                        String subscriptionName, Set<String> roles,
+                                                                        String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace,
+                                                                         String subscriptionName, String role,
+                                                                         String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
+                                                            String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role,
+                                                                    TenantOperation operation,
+                                                                    AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role,
+                                                                       NamespaceOperation operation,
+                                                                       AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                             PolicyName policy,
+                                                                             PolicyOperation operation, String role,
+                                                                             AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role,
+                                                                   TopicOperation operation,
+                                                                   AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public void close() throws IOException {
+            // noop
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters(ADMIN_TOKEN);
+
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthorizationProvider(MyAuthorizationProvider.class.getName());
+    }
+
+    @SneakyThrows
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+        super.customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
+        pulsarAdminBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN);
+    }
+
+    @SneakyThrows
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        super.customizeNewPulsarClientBuilder(clientBuilder);
+        clientBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN);
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test
+    public void testAdmin() throws PulsarAdminException {
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Sets.newHashSet(configClusterName));
+        admin.tenants().createTenant("test-tenant-1", tenantInfo);
+        admin.namespaces().createNamespace("test-tenant-1/test-namespace-1");
+        String partitionedTopic = UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(partitionedTopic,3);
+        String nonPartitionedTopic = UUID.randomUUID().toString();
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+        admin.lookups().lookupPartitionedTopic(partitionedTopic);
+        admin.lookups().lookupTopic(nonPartitionedTopic);
+    }
+
+    @Test
+    public void testClient() throws PulsarClientException {
+        String topic = UUID.randomUUID().toString();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+        byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
+        producer.send(msg);
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test").subscribe();
+        Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(receive);
+        assertEquals(receive.getData(), msg);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ed2753db763..b0e769ded9c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -122,8 +123,15 @@ public abstract class MockedPulsarServiceBaseTest {
         pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
     }
 
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+
+    }
+
     protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
-        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+        ClientBuilder clientBuilder =
+                PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS);
+        customizeNewPulsarClientBuilder(clientBuilder);
+        return clientBuilder.build();
     }
 
     protected final void internalSetupForStatsTest() throws Exception {
@@ -256,9 +264,14 @@ public abstract class MockedPulsarServiceBaseTest {
         PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
                 ? brokerUrl.toString()
                 : brokerUrlTls.toString());
+        customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
         admin = spy(pulsarAdminBuilder.build());
     }
 
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+
+    }
+
     protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
 
         boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
@@ -423,5 +436,26 @@ public abstract class MockedPulsarServiceBaseTest {
         return configuration;
     }
 
+    protected void setupDefaultTenantAndNamespace() throws Exception {
+        final String tenant = "public";
+        final String namespace = tenant + "/default";
+
+        if (!admin.clusters().getClusters().contains(configClusterName)) {
+            ClusterData clusterData = new ClusterData();
+            clusterData.setServiceUrl(pulsar.getWebServiceAddress());
+            admin.clusters().createCluster(configClusterName, clusterData);
+        }
+
+        if (!admin.tenants().getTenants().contains(tenant)) {
+            TenantInfo tenantInfo = new TenantInfo();
+            tenantInfo.setAllowedClusters(Sets.newHashSet(configClusterName));
+            admin.tenants().createTenant(tenant, tenantInfo);
+        }
+
+        if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
+            admin.namespaces().createNamespace(namespace);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }