You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/05 18:53:06 UTC

[pulsar] 04/08: [improve][test] Verify the authentication data in the authorization provider (#16900)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 05b16e24ba83cfd4b564f3ad33ad9c4067d6c381
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Thu Aug 4 15:44:28 2022 +0800

    [improve][test] Verify the authentication data in the authorization provider (#16900)
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    (cherry picked from commit cdec98a9a624d47125535e32e8eba2dd4bb4125f)
---
 .../broker/auth/AuthorizationWithAuthDataTest.java | 291 +++++++++++++++++++++
 1 file changed, 291 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
new file mode 100644
index 00000000000..ccb401f74b3
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AuthorizationWithAuthDataTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_ROLE = "admin";
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact();
+
+    public static class MyAuthorizationProvider implements AuthorizationProvider {
+
+        public MyAuthorizationProvider() {
+        }
+
+        private void assertRoleAndAuthenticationData(String role, AuthenticationDataSource authenticationData) {
+            assertEquals(role, ADMIN_ROLE);
+            if (authenticationData.hasDataFromHttp()) {
+                String authorization = authenticationData.getHttpHeader("Authorization");
+                assertEquals(authorization, "Bearer " + ADMIN_TOKEN);
+            } else {
+                assertEquals(authenticationData.getCommandData(), ADMIN_TOKEN);
+            }
+        }
+
+        @Override
+        public CompletableFuture<Boolean> isSuperUser(String role, AuthenticationDataSource authenticationData,
+                                                      ServiceConfiguration serviceConfiguration) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
+                                                        AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+                                                          AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+                                                          AuthenticationDataSource authenticationData,
+                                                          String subscription) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+                                                         AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+                                                                AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
+                                                              AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
+                                                            AuthenticationDataSource authenticationData) {
+            assertRoleAndAuthenticationData(role, authenticationData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+                                                            String role, String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
+                                                                        String subscriptionName, Set<String> roles,
+                                                                        String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace,
+                                                                         String subscriptionName, String role,
+                                                                         String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
+                                                            String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role,
+                                                                    TenantOperation operation,
+                                                                    AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role,
+                                                                       NamespaceOperation operation,
+                                                                       AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                             PolicyName policy,
+                                                                             PolicyOperation operation, String role,
+                                                                             AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role,
+                                                                   TopicOperation operation,
+                                                                   AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role,
+                                                                         PolicyName policy, PolicyOperation operation,
+                                                                         AuthenticationDataSource authData) {
+            assertRoleAndAuthenticationData(role, authData);
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public void close() throws IOException {
+            // noop
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters(ADMIN_TOKEN);
+
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthorizationProvider(MyAuthorizationProvider.class.getName());
+    }
+
+    @SneakyThrows
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+        super.customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
+        pulsarAdminBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN);
+    }
+
+    @SneakyThrows
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        super.customizeNewPulsarClientBuilder(clientBuilder);
+        clientBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN);
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test
+    public void testAdmin() throws PulsarAdminException {
+        admin.tenants().createTenant("test-tenant-1",
+                TenantInfo.builder().allowedClusters(Set.of(configClusterName)).build());
+        admin.namespaces().createNamespace("test-tenant-1/test-namespace-1");
+        String partitionedTopic = UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(partitionedTopic,3);
+        String nonPartitionedTopic = UUID.randomUUID().toString();
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+        admin.lookups().lookupPartitionedTopic(partitionedTopic);
+        admin.lookups().lookupTopic(nonPartitionedTopic);
+    }
+
+    @Test
+    public void testClient() throws PulsarClientException {
+        String topic = UUID.randomUUID().toString();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+        byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
+        producer.send(msg);
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test").subscribe();
+        Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(receive);
+        assertEquals(receive.getData(), msg);
+    }
+}