You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 09:31:25 UTC

[pulsar] 02/12: [Authorization] Support GET_METADATA topic op after enable auth (#12656)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 74c91a30224cc25c20054e1f679b3e2667238891
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Nov 8 15:21:43 2021 +0800

    [Authorization] Support GET_METADATA topic op after enable auth (#12656)
    
    ### Motivation
    Currently, we can get the internal stats of a topic through `bin/pulsar-admin topics stats-internal tn1/ns1/tp1` and also get ledger metadata by specifying flag `--metadata`.
    
    However I found that `PulsarAuthorizationProvider` lacks support for topic operation `GET_METADATA` when verifying the role's authorization, code as below:
    https://github.com/apache/pulsar/blob/08a49c06bff4a52d26319a114961aed6cb6c4791/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L1162-L1164
    https://github.com/apache/pulsar/blob/08a49c06bff4a52d26319a114961aed6cb6c4791/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L567-L596
    
    The purpose of this PR is to support role with `lookup` topic authorization to `GET_METADATA` of ledger.
    
    (cherry picked from commit e4767355bc7fa0a2e6bbb09acf1f93e4cd7cbe0b)
---
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../auth/admin/GetMetadataOfTopicWithAuthTest.java | 213 +++++++++++++++++++++
 2 files changed, 214 insertions(+)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index e339987..06291ee 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -582,6 +582,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
         switch (operation) {
             case LOOKUP:
             case GET_STATS:
+            case GET_METADATA:
                 isAuthorizedFuture = canLookupAsync(topicName, role, authData);
                 break;
             case PRODUCE:
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
new file mode 100644
index 0000000..7578629
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.elasticsearch.common.collect.Set;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * GetMetadataOfTopicWithAuthTest will test Getmetadata operation with and without the proper permission.
+ */
+@Slf4j
+public class GetMetadataOfTopicWithAuthTest extends TestRetrySupport {
+
+    private static final String CLUSTER_PREFIX = "get-metadata-auth";
+    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
+    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";
+
+    private static final String SUPER_USER_ROLE = "super-user";
+    private String superUserAuthToken;
+    private static final String PROXY_ROLE = "proxy";
+    private String proxyAuthToken;
+    private static final String REGULAR_USER_ROLE = "client";
+    private String clientAuthToken;
+    private File publicKeyFile;
+
+    private PulsarCluster pulsarCluster;
+    private PulsarContainer cmdContainer;
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        incrementSetupNumber();
+        // Before starting the cluster, generate the secret key and the token
+        // Use Zk container to have 1 container available before starting the cluster
+        final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
+        final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
+        cmdContainer = new ZKContainer<>(cliContainerName);
+        cmdContainer
+                .withNetwork(Network.newNetwork())
+                .withNetworkAliases(ZKContainer.NAME)
+                .withEnv("zkServers", ZKContainer.NAME);
+        cmdContainer.start();
+
+        createKeysAndTokens(cmdContainer);
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(2)
+                .numProxies(1)
+                .clusterName(clusterName)
+                .brokerEnvs(getBrokerSettingsEnvs())
+                .proxyEnvs(getProxySettingsEnvs())
+                .brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                .proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                .build();
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        markCurrentSetupNumberCleaned();
+        if (cmdContainer != null) {
+            cmdContainer.stop();
+        }
+        if (pulsarCluster != null) {
+            pulsarCluster.stop();
+        }
+    }
+
+    private Map<String, String> getBrokerSettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("authenticateOriginalAuthData", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    private Map<String, String> getProxySettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("forwardAuthorizationCredentials", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    protected void createKeysAndTokens(PulsarContainer container) throws Exception {
+        container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
+                        "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+        byte[] publicKeyBytes = DockerUtils
+                .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
+                        "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+                .getStdout();
+
+        publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
+        Files.write(publicKeyBytes, publicKeyFile);
+
+        clientAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", REGULAR_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created client token: {}", clientAuthToken);
+
+        superUserAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", SUPER_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created super-user token: {}", superUserAuthToken);
+
+        proxyAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", PROXY_ROLE)
+                .getStdout().trim();
+        log.info("Created proxy token: {}", proxyAuthToken);
+    }
+
+    @Test
+    public void testGetMetadataOfTopicWithLookupPermission() throws Exception {
+        @Cleanup
+        PulsarAdmin superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(superUserAuthToken))
+                .build();
+
+        @Cleanup
+        PulsarAdmin clientAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(clientAuthToken))
+                .build();
+
+        // create partitioned topic
+        superUserAdmin.topics().createPartitionedTopic("public/default/test", 1);
+
+        // do some operation without grant any permissions
+        try {
+            clientAdmin.topics().getInternalStats("public/default/test-partition-0", true);
+            fail("get internal stats and metadata operation should fail because the client hasn't permission to do");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 401);
+        }
+
+        // grant consume/produce permission to the role
+        superUserAdmin.topics().grantPermission("public/default/test",
+                REGULAR_USER_ROLE, Set.of(AuthAction.consume));
+
+        // then do some get internal stats and metadata operations again, it should success
+        PersistentTopicInternalStats internalStats = clientAdmin.topics()
+                .getInternalStats("public/default/test-partition-0", true);
+        assertNotNull(internalStats);
+    }
+}