You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/21 04:34:44 UTC

[pulsar] branch master updated: Add auth action for package management service (#8893)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c5d6b1  Add auth action for package management service (#8893)
7c5d6b1 is described below

commit 7c5d6b1ce091df2adbadf06a1374848faf6de273
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Jan 21 12:34:05 2021 +0800

    Add auth action for package management service (#8893)
    
    Master Issue: #8676
    
    *Motivation*
    
    Add auth action 'package' for package management operations.
    Only the role who is granted the 'package' permission can do
    the package operations.
    
    *Modifications*
    
    - Add 'package' auth action and check the permissions when access the REST API
    - Add integration test for this
---
 .github/workflows/ci-integration-cli.yaml          |   7 +-
 .../authorization/PulsarAuthorizationProvider.java |  24 ++-
 .../pulsar/broker/admin/impl/PackagesBase.java     |  79 +++++++-
 .../pulsar/common/policies/data/AuthAction.java    |   3 +
 .../common/policies/data/NamespaceOperation.java   |   2 +
 .../auth/admin/PackagesOpsWithAuthTest.java        | 207 +++++++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |   6 +
 .../integration/topologies/PulsarClusterSpec.java  |  10 +
 .../integration/src/test/resources/pulsar-auth.xml |  28 +++
 9 files changed, 353 insertions(+), 13 deletions(-)

diff --git a/.github/workflows/ci-integration-cli.yaml b/.github/workflows/ci-integration-cli.yaml
index 89a0777..a1e0b45 100644
--- a/.github/workflows/ci-integration-cli.yaml
+++ b/.github/workflows/ci-integration-cli.yaml
@@ -88,6 +88,11 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
 
-      - name: run integration tests
+      - name: run pulsar cli integration tests
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-cli.xml -DintegrationTests -DredirectTestOutputToFile=false
+
+      - name: run pulsar auth integration tests
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-auth.xml -DintegrationTests -DredirectTestOutputToFile=false
+
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index ca7ca9e..3297f3f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -219,20 +219,20 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
 
     @Override
     public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
-        return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.functions);
+        return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.functions);
     }
 
     @Override
     public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
-        return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.sources);
+        return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sources);
     }
 
     @Override
     public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
-        return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
+        return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
     }
 
-    private CompletableFuture<Boolean> allowFunctionSourceSinkOpsAsync(NamespaceName namespaceName, String role,
+    private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
                                                                        AuthenticationDataSource authenticationData,
                                                                        AuthAction authAction) {
         CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
@@ -538,7 +538,21 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
                                                                    String role,
                                                                    NamespaceOperation operation,
                                                                    AuthenticationDataSource authData) {
-        return validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
+        CompletableFuture<Boolean> isAuthorizedFuture;
+        if (operation == NamespaceOperation.PACKAGES) {
+            isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages);
+        } else {
+            isAuthorizedFuture = CompletableFuture.completedFuture(false);
+        }
+        CompletableFuture<Boolean> isTenantAdminFuture = validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
+        return isTenantAdminFuture.thenCombine(isAuthorizedFuture, (isTenantAdmin, isAuthorized) -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Verify if role {} is allowed to {} to topic {}:"
+                        + " isTenantAdmin={}, isAuthorized={}",
+                    role, operation, namespaceName, isTenantAdmin, isAuthorized);
+            }
+            return isTenantAdmin || isAuthorized;
+        });
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
index b0408b4..ccb611d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
@@ -21,21 +21,27 @@ package org.apache.pulsar.broker.admin.impl;
 import java.io.InputStream;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.packages.management.core.PackagesManagement;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
 import org.apache.pulsar.packages.management.core.common.PackageName;
 import org.apache.pulsar.packages.management.core.common.PackageType;
 import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
 
-
 @Slf4j
 public class PackagesBase extends AdminResource {
+
+    private AuthorizationService authorizationService;
+
     private PackagesManagement getPackagesManagement() {
         return pulsar().getPackagesManagement();
     }
@@ -57,7 +63,10 @@ public class PackagesBase extends AdminResource {
             asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, throwable.getMessage()));
         } else if (throwable instanceof PackagesManagementException.NotFoundException) {
             asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, throwable.getMessage()));
+        } else if (throwable instanceof WebApplicationException) {
+            asyncResponse.resume(throwable);
         } else {
+            log.error("Encountered unexpected error", throwable);
             asyncResponse.resume(new RestException(Response.Status.INTERNAL_SERVER_ERROR, throwable.getMessage()));
         }
         return null;
@@ -65,7 +74,8 @@ public class PackagesBase extends AdminResource {
 
     protected void internalGetMetadata(String type, String tenant, String namespace, String packageName,
                                                   String version, AsyncResponse asyncResponse) {
-        getPackageNameAsync(type, tenant, namespace, packageName, version)
+        checkPermissions(tenant, namespace)
+            .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
             .thenCompose(name -> getPackagesManagement().getMeta(name))
             .thenAccept(asyncResponse::resume)
             .exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -73,7 +83,8 @@ public class PackagesBase extends AdminResource {
 
     protected void internalUpdateMetadata(String type, String tenant, String namespace, String packageName,
                                           String version, PackageMetadata metadata, AsyncResponse asyncResponse) {
-        getPackageNameAsync(type, tenant, namespace, packageName, version)
+        checkPermissions(tenant, namespace)
+            .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
             .thenCompose(name -> getPackagesManagement().updateMeta(name, metadata))
             .thenAccept(ignore -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -82,6 +93,17 @@ public class PackagesBase extends AdminResource {
     protected StreamingOutput internalDownload(String type, String tenant, String namespace,
                                                String packageName, String version) {
         try {
+            checkPermissions(tenant, namespace).get();
+        } catch (InterruptedException e) {
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof WebApplicationException) {
+                throw (WebApplicationException) e.getCause();
+            } else {
+                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
+            }
+        }
+        try {
             PackageName name = PackageName.get(type, tenant, namespace, packageName, version);
             return output -> {
                 try {
@@ -104,7 +126,8 @@ public class PackagesBase extends AdminResource {
     protected void internalUpload(String type, String tenant, String namespace, String packageName, String version,
                                   PackageMetadata metadata, InputStream uploadedInputStream,
                                   AsyncResponse asyncResponse) {
-        getPackageNameAsync(type, tenant, namespace, packageName, version)
+        checkPermissions(tenant, namespace)
+            .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
             .thenCompose(name -> getPackagesManagement().upload(name, metadata, uploadedInputStream))
             .thenAccept(ignore -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -112,7 +135,8 @@ public class PackagesBase extends AdminResource {
 
     protected void internalDelete(String type, String tenant, String namespace, String packageName, String version,
                                   AsyncResponse asyncResponse) {
-        getPackageNameAsync(type, tenant, namespace, packageName, version)
+        checkPermissions(tenant, namespace)
+            .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
             .thenCompose(name -> getPackagesManagement().delete(name))
             .thenAccept(ignore -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -120,7 +144,8 @@ public class PackagesBase extends AdminResource {
 
     protected void internalListVersions(String type, String tenant, String namespace, String packageName,
                                                      AsyncResponse asyncResponse) {
-        getPackageNameAsync(type, tenant, namespace, packageName, "")
+        checkPermissions(tenant, namespace)
+            .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, ""))
             .thenCompose(name -> getPackagesManagement().list(name))
             .thenAccept(asyncResponse::resume)
             .exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -129,11 +154,51 @@ public class PackagesBase extends AdminResource {
     protected void internalListPackages(String type, String tenant, String namespace, AsyncResponse asyncResponse) {
         try {
             PackageType packageType = PackageType.getEnum(type);
-            getPackagesManagement().list(packageType, tenant, namespace)
+            checkPermissions(tenant, namespace)
+                .thenCompose(ignore -> getPackagesManagement().list(packageType, tenant, namespace))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(e -> handleError(e.getCause(), asyncResponse));
         } catch (IllegalArgumentException iae) {
             asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, iae.getMessage()));
         }
     }
+
+    private CompletableFuture<Void> checkPermissions(String tenant, String namespace) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        if (config().isAuthenticationEnabled()) {
+            NamespaceName namespaceName;
+            try {
+                namespaceName = NamespaceName.get(tenant, namespace);
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+                return future;
+            }
+            getAuthorizationService()
+                .allowNamespaceOperationAsync(namespaceName, NamespaceOperation.PACKAGES, originalPrincipal(),
+                    clientAppId(), clientAuthData())
+                .whenComplete((hasPermission, throwable) -> {
+                    if (throwable != null) {
+                        future.completeExceptionally(throwable);
+                        return;
+                    }
+                    if (hasPermission) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(new RestException(Response.Status.UNAUTHORIZED, String.format(
+                            "Role %s has not the 'package' permission to do the packages operations.", clientAppId())));
+                    }
+                });
+        } else {
+            future.complete(null);
+        }
+        return future;
+    }
+
+    private AuthorizationService getAuthorizationService() {
+        if (authorizationService == null) {
+            authorizationService = pulsar().getBrokerService().getAuthorizationService();
+            return authorizationService;
+        }
+        return authorizationService;
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
index 646ca03..9a1b36c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
@@ -36,4 +36,7 @@ public enum AuthAction {
 
     /** Permissions for sinks ops. **/
     sinks,
+
+    /** Permissions for packages ops. **/
+    packages
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
index bda93c4..ceef164 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
@@ -38,4 +38,6 @@ public enum NamespaceOperation {
 
     CLEAR_BACKLOG,
     UNSUBSCRIBE,
+
+    PACKAGES,
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java
new file mode 100644
index 0000000..aeb7089
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.admin;
+
+import com.google.common.io.Files;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.elasticsearch.common.collect.Set;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+
+/**
+ * PackagesOpsWithAuthTest will test all package operations with and without the proper permission.
+ */
+@Slf4j
+public class PackagesOpsWithAuthTest {
+
+    private static final String CLUSTER_PREFIX = "package-auth";
+    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
+    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";
+
+    private static final String SUPER_USER_ROLE = "super-user";
+    private String superUserAuthToken;
+    private static final String PROXY_ROLE = "proxy";
+    private String proxyAuthToken;
+    private static final String REGULAR_USER_ROLE = "client";
+    private String clientAuthToken;
+    private File publicKeyFile;
+
+    private PulsarCluster pulsarCluster;
+    private PulsarContainer cmdContainer;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        // Before starting the cluster, generate the secret key and the token
+        // Use Zk container to have 1 container available before starting the cluster
+        final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
+        final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
+        cmdContainer = new ZKContainer<>(cliContainerName);
+        cmdContainer
+            .withNetwork(Network.newNetwork())
+            .withNetworkAliases(ZKContainer.NAME)
+            .withEnv("zkServers", ZKContainer.NAME);
+        cmdContainer.start();
+
+        createKeysAndTokens(cmdContainer);
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+            .numBookies(2)
+            .numBrokers(2)
+            .numProxies(1)
+            .clusterName(clusterName)
+            .brokerEnvs(getBrokerSettingsEnvs())
+            .proxyEnvs(getProxySettingsEnvs())
+            .brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+            .proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+            .build();
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+    }
+
+    @AfterClass
+    public void teardown() {
+        if (cmdContainer != null) {
+            cmdContainer.stop();
+        }
+        if (pulsarCluster != null) {
+            pulsarCluster.stop();
+        }
+    }
+
+    private Map<String, String> getBrokerSettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("enablePackagesManagement", "true");
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("authenticateOriginalAuthData", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    private Map<String, String> getProxySettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("forwardAuthorizationCredentials", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    protected void createKeysAndTokens(PulsarContainer container) throws Exception {
+        container
+            .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
+                "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+        byte[] publicKeyBytes = DockerUtils
+            .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
+                "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+            .getStdout();
+
+        publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
+        Files.write(publicKeyBytes, publicKeyFile);
+
+        clientAuthToken = container
+            .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                "--subject", REGULAR_USER_ROLE)
+            .getStdout().trim();
+        log.info("Created client token: {}", clientAuthToken);
+
+        superUserAuthToken = container
+            .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                "--subject", SUPER_USER_ROLE)
+            .getStdout().trim();
+        log.info("Created super-user token: {}", superUserAuthToken);
+
+        proxyAuthToken = container
+            .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                "--subject", PROXY_ROLE)
+            .getStdout().trim();
+        log.info("Created proxy token: {}", proxyAuthToken);
+    }
+
+    @Test
+    public void testPackagesOps() throws Exception {
+        @Cleanup
+        PulsarAdmin superUserAdmin = PulsarAdmin.builder()
+            .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+            .authentication(AuthenticationFactory.token(superUserAuthToken))
+            .build();
+
+        @Cleanup
+        PulsarAdmin clientAdmin = PulsarAdmin.builder()
+            .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+            .authentication(AuthenticationFactory.token(clientAuthToken))
+            .build();
+
+        // do some operation without grant any permissions
+        try {
+            List<String> packagesName = clientAdmin.packages().listPackages("function", "public/default");
+            fail("list package operation should fail because the client hasn't permission to do");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 401);
+        }
+
+        // grant package permission to the role
+        superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+            REGULAR_USER_ROLE, Set.of(AuthAction.packages));
+
+        // then do some package operations again, it should success
+        List<String> packagesName = clientAdmin.packages().listPackages("function", "public/default");
+        assertEquals(packagesName.size(), 0);
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 8d835dd..8d5404c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -137,6 +137,9 @@ public class PulsarCluster {
         if (spec.proxyEnvs != null) {
             spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
         }
+        if (spec.proxyMountFiles != null) {
+            spec.proxyMountFiles.forEach(this.proxyContainer::withFileSystemBind);
+        }
 
         // create bookies
         bookieContainers.putAll(
@@ -174,6 +177,9 @@ public class PulsarCluster {
                     if (spec.brokerEnvs != null) {
                         brokerContainer.withEnv(spec.brokerEnvs);
                     }
+                    if (spec.brokerMountFiles != null) {
+                        spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind);
+                    }
                     return brokerContainer;
                 }
             ));
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index b0c49bd..9dcfcfb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -142,4 +142,14 @@ public class PulsarClusterSpec {
      * Specify envs for broker.
      */
     Map<String, String> brokerEnvs;
+
+    /**
+     * Specify mount files.
+     */
+    Map<String, String> proxyMountFiles;
+
+    /**
+     * Specify mount files.
+     */
+    Map<String, String> brokerMountFiles;
 }
diff --git a/tests/integration/src/test/resources/pulsar-auth.xml b/tests/integration/src/test/resources/pulsar-auth.xml
new file mode 100644
index 0000000..d361c57
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-auth.xml
@@ -0,0 +1,28 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar Auth Integration Tests" verbose="2" annotations="JDK">
+    <test name="pulsar-auth-test-suite" preserve-order="true" >
+        <classes>
+            <class name="org.apache.pulsar.tests.integration.auth.admin.PackagesOpsWithAuthTest"/>
+        </classes>
+    </test>
+</suite>