You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 11:53:27 UTC

[pulsar] 06/09: allow consume permission to do GetTopics op (#12600)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3a31716d422468c187c44a174ea2546222679404
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Nov 5 09:30:57 2021 +0800

    allow consume permission to do GetTopics op (#12600)
    
    Fixes #12423
    
    ### Motivation
    Regex subscription requires to get the topics list of given namespace with GetTopicsOfNamespace request, but this request requires tenant admin permission which will block the regex consumers who only have consume permission.
    
    ### Modifications
    This PR added the consume permission check for GetTopicsOfNamespace, which allows consumers get the topics list with consume permission.
    
    (cherry picked from commit 7e078aad5cb0b07f5e0d609025cf13b934cf28eb)
---
 .../authorization/AuthorizationProvider.java       |  10 +
 .../MultiRolesTokenAuthorizationProvider.java      |   5 +
 .../authorization/PulsarAuthorizationProvider.java |   8 +
 .../broker/auth/MockAuthorizationProvider.java     |   6 +
 .../api/AuthorizationProducerConsumerTest.java     |   5 +
 .../impl/PatternTopicsConsumerImplAuthTest.java    |   5 +
 .../admin/GetTopicsOfNamespaceWithAuthTest.java    | 208 +++++++++++++++++++++
 7 files changed, 247 insertions(+)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index c83ae4c..ca1bfbf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -185,6 +185,16 @@ public interface AuthorizationProvider extends Closeable {
                                                  AuthenticationDataSource authenticationData);
 
     /**
+     * Allow consume operations with in this namespace
+     * @param namespaceName The namespace that the consume operations can be executed in
+     * @param role The role to check
+     * @param authenticationData authentication data related to the role
+     * @return a boolean to determine whether authorized or not
+     */
+    CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role,
+                                                 AuthenticationDataSource authenticationData);
+
+    /**
      *
      * Grant authorization-action permission on a namespace to the given client
      *
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index dcdf779..89ed834 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -211,6 +211,11 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
     }
 
     @Override
+    public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return authorize(authenticationData, r -> super.allowConsumeOpsAsync(namespaceName, r, authenticationData));
+    }
+
+    @Override
     public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
                                                                 String role,
                                                                 TenantOperation operation,
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index a4d8634..641591c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -227,6 +227,11 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
         return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
     }
 
+    @Override
+    public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.consume);
+    }
+
     private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
                                                                        AuthenticationDataSource authenticationData,
                                                                        AuthAction authAction) {
@@ -525,6 +530,9 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
             case PACKAGES:
                 isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages);
                 break;
+            case GET_TOPICS:
+                isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
+                break;
             default:
                 isAuthorizedFuture = CompletableFuture.completedFuture(false);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
index 3af2568..74ba55e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
@@ -97,6 +97,12 @@ public class MockAuthorizationProvider implements AuthorizationProvider {
     }
 
     @Override
+    public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role,
+                                                    AuthenticationDataSource authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
     public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
                                                         String authDataJson) {
         return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 0ad79ff..0a799fb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -499,6 +499,11 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         }
 
         @Override
+        public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
                 String role, String authenticationData) {
             return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index b1328a2..40c31db 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -298,6 +298,11 @@ public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
         }
 
         @Override
+        public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
                                                             String role, String authenticationData) {
             return CompletableFuture.completedFuture(null);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetTopicsOfNamespaceWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetTopicsOfNamespaceWithAuthTest.java
new file mode 100644
index 0000000..68de70d
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetTopicsOfNamespaceWithAuthTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.elasticsearch.common.collect.Set;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * GetTopicsOfNamespaceWithAuthTest will test GetTopics operation with and without the proper permission.
+ */
+@Slf4j
+public class GetTopicsOfNamespaceWithAuthTest extends TestRetrySupport {
+
+    private static final String CLUSTER_PREFIX = "get-topics-auth";
+    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
+    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";
+
+    private static final String SUPER_USER_ROLE = "super-user";
+    private String superUserAuthToken;
+    private static final String PROXY_ROLE = "proxy";
+    private String proxyAuthToken;
+    private static final String REGULAR_USER_ROLE = "client";
+    private String clientAuthToken;
+    private File publicKeyFile;
+
+    private PulsarCluster pulsarCluster;
+    private PulsarContainer cmdContainer;
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        incrementSetupNumber();
+        // Before starting the cluster, generate the secret key and the token
+        // Use Zk container to have 1 container available before starting the cluster
+        final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
+        final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
+        cmdContainer = new ZKContainer<>(cliContainerName);
+        cmdContainer
+                .withNetwork(Network.newNetwork())
+                .withNetworkAliases(ZKContainer.NAME)
+                .withEnv("zkServers", ZKContainer.NAME);
+        cmdContainer.start();
+
+        createKeysAndTokens(cmdContainer);
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(2)
+                .numProxies(1)
+                .clusterName(clusterName)
+                .brokerEnvs(getBrokerSettingsEnvs())
+                .proxyEnvs(getProxySettingsEnvs())
+                .brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                .proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                .build();
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        markCurrentSetupNumberCleaned();
+        if (cmdContainer != null) {
+            cmdContainer.stop();
+        }
+        if (pulsarCluster != null) {
+            pulsarCluster.stop();
+        }
+    }
+
+    private Map<String, String> getBrokerSettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("authenticateOriginalAuthData", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    private Map<String, String> getProxySettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("forwardAuthorizationCredentials", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    protected void createKeysAndTokens(PulsarContainer container) throws Exception {
+        container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
+                        "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+        byte[] publicKeyBytes = DockerUtils
+                .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
+                        "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+                .getStdout();
+
+        publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
+        Files.write(publicKeyBytes, publicKeyFile);
+
+        clientAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", REGULAR_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created client token: {}", clientAuthToken);
+
+        superUserAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", SUPER_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created super-user token: {}", superUserAuthToken);
+
+        proxyAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", PROXY_ROLE)
+                .getStdout().trim();
+        log.info("Created proxy token: {}", proxyAuthToken);
+    }
+
+    @Test
+    public void testGetTopicsOfNamespaceOpsWithConsumePermission() throws Exception {
+        @Cleanup
+        PulsarAdmin superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(superUserAuthToken))
+                .build();
+
+        @Cleanup
+        PulsarAdmin clientAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(clientAuthToken))
+                .build();
+
+        // do some operation without grant any permissions
+        try {
+            clientAdmin.namespaces().getTopics("public/default");
+            fail("list topics operation should fail because the client hasn't permission to do");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 401);
+        }
+
+        // grant consume permission to the role
+        superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                REGULAR_USER_ROLE, Set.of(AuthAction.consume));
+
+        // then do some get topics operations again, it should success
+        List<String> topics = clientAdmin.namespaces().getTopics("public/default");
+        assertEquals(topics.size(), 0);
+    }
+}