You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/21 04:34:44 UTC
[pulsar] branch master updated: Add auth action for package
management service (#8893)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c5d6b1 Add auth action for package management service (#8893)
7c5d6b1 is described below
commit 7c5d6b1ce091df2adbadf06a1374848faf6de273
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Jan 21 12:34:05 2021 +0800
Add auth action for package management service (#8893)
Master Issue: #8676
*Motivation*
Add auth action 'package' for package management operations.
Only the role who is granted the 'package' permission can do
the package operations.
*Modifications*
- Add 'package' auth action and check the permissions when access the REST API
- Add integration test for this
---
.github/workflows/ci-integration-cli.yaml | 7 +-
.../authorization/PulsarAuthorizationProvider.java | 24 ++-
.../pulsar/broker/admin/impl/PackagesBase.java | 79 +++++++-
.../pulsar/common/policies/data/AuthAction.java | 3 +
.../common/policies/data/NamespaceOperation.java | 2 +
.../auth/admin/PackagesOpsWithAuthTest.java | 207 +++++++++++++++++++++
.../integration/topologies/PulsarCluster.java | 6 +
.../integration/topologies/PulsarClusterSpec.java | 10 +
.../integration/src/test/resources/pulsar-auth.xml | 28 +++
9 files changed, 353 insertions(+), 13 deletions(-)
diff --git a/.github/workflows/ci-integration-cli.yaml b/.github/workflows/ci-integration-cli.yaml
index 89a0777..a1e0b45 100644
--- a/.github/workflows/ci-integration-cli.yaml
+++ b/.github/workflows/ci-integration-cli.yaml
@@ -88,6 +88,11 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
- - name: run integration tests
+ - name: run pulsar cli integration tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-cli.xml -DintegrationTests -DredirectTestOutputToFile=false
+
+ - name: run pulsar auth integration tests
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-auth.xml -DintegrationTests -DredirectTestOutputToFile=false
+
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index ca7ca9e..3297f3f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -219,20 +219,20 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
@Override
public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
- return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.functions);
+ return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.functions);
}
@Override
public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
- return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.sources);
+ return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sources);
}
@Override
public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
- return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
+ return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
}
- private CompletableFuture<Boolean> allowFunctionSourceSinkOpsAsync(NamespaceName namespaceName, String role,
+ private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData,
AuthAction authAction) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
@@ -538,7 +538,21 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
String role,
NamespaceOperation operation,
AuthenticationDataSource authData) {
- return validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
+ CompletableFuture<Boolean> isAuthorizedFuture;
+ if (operation == NamespaceOperation.PACKAGES) {
+ isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages);
+ } else {
+ isAuthorizedFuture = CompletableFuture.completedFuture(false);
+ }
+ CompletableFuture<Boolean> isTenantAdminFuture = validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
+ return isTenantAdminFuture.thenCombine(isAuthorizedFuture, (isTenantAdmin, isAuthorized) -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Verify if role {} is allowed to {} to topic {}:"
+ + " isTenantAdmin={}, isAuthorized={}",
+ role, operation, namespaceName, isTenantAdmin, isAuthorized);
+ }
+ return isTenantAdmin || isAuthorized;
+ });
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
index b0408b4..ccb611d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
@@ -21,21 +21,27 @@ package org.apache.pulsar.broker.admin.impl;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
import org.apache.pulsar.packages.management.core.common.PackageType;
import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
-
@Slf4j
public class PackagesBase extends AdminResource {
+
+ private AuthorizationService authorizationService;
+
private PackagesManagement getPackagesManagement() {
return pulsar().getPackagesManagement();
}
@@ -57,7 +63,10 @@ public class PackagesBase extends AdminResource {
asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, throwable.getMessage()));
} else if (throwable instanceof PackagesManagementException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, throwable.getMessage()));
+ } else if (throwable instanceof WebApplicationException) {
+ asyncResponse.resume(throwable);
} else {
+ log.error("Encountered unexpected error", throwable);
asyncResponse.resume(new RestException(Response.Status.INTERNAL_SERVER_ERROR, throwable.getMessage()));
}
return null;
@@ -65,7 +74,8 @@ public class PackagesBase extends AdminResource {
protected void internalGetMetadata(String type, String tenant, String namespace, String packageName,
String version, AsyncResponse asyncResponse) {
- getPackageNameAsync(type, tenant, namespace, packageName, version)
+ checkPermissions(tenant, namespace)
+ .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
.thenCompose(name -> getPackagesManagement().getMeta(name))
.thenAccept(asyncResponse::resume)
.exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -73,7 +83,8 @@ public class PackagesBase extends AdminResource {
protected void internalUpdateMetadata(String type, String tenant, String namespace, String packageName,
String version, PackageMetadata metadata, AsyncResponse asyncResponse) {
- getPackageNameAsync(type, tenant, namespace, packageName, version)
+ checkPermissions(tenant, namespace)
+ .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
.thenCompose(name -> getPackagesManagement().updateMeta(name, metadata))
.thenAccept(ignore -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -82,6 +93,17 @@ public class PackagesBase extends AdminResource {
protected StreamingOutput internalDownload(String type, String tenant, String namespace,
String packageName, String version) {
try {
+ checkPermissions(tenant, namespace).get();
+ } catch (InterruptedException e) {
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof WebApplicationException) {
+ throw (WebApplicationException) e.getCause();
+ } else {
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
+ }
+ }
+ try {
PackageName name = PackageName.get(type, tenant, namespace, packageName, version);
return output -> {
try {
@@ -104,7 +126,8 @@ public class PackagesBase extends AdminResource {
protected void internalUpload(String type, String tenant, String namespace, String packageName, String version,
PackageMetadata metadata, InputStream uploadedInputStream,
AsyncResponse asyncResponse) {
- getPackageNameAsync(type, tenant, namespace, packageName, version)
+ checkPermissions(tenant, namespace)
+ .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
.thenCompose(name -> getPackagesManagement().upload(name, metadata, uploadedInputStream))
.thenAccept(ignore -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -112,7 +135,8 @@ public class PackagesBase extends AdminResource {
protected void internalDelete(String type, String tenant, String namespace, String packageName, String version,
AsyncResponse asyncResponse) {
- getPackageNameAsync(type, tenant, namespace, packageName, version)
+ checkPermissions(tenant, namespace)
+ .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, version))
.thenCompose(name -> getPackagesManagement().delete(name))
.thenAccept(ignore -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -120,7 +144,8 @@ public class PackagesBase extends AdminResource {
protected void internalListVersions(String type, String tenant, String namespace, String packageName,
AsyncResponse asyncResponse) {
- getPackageNameAsync(type, tenant, namespace, packageName, "")
+ checkPermissions(tenant, namespace)
+ .thenCompose(ignore -> getPackageNameAsync(type, tenant, namespace, packageName, ""))
.thenCompose(name -> getPackagesManagement().list(name))
.thenAccept(asyncResponse::resume)
.exceptionally(e -> handleError(e.getCause(), asyncResponse));
@@ -129,11 +154,51 @@ public class PackagesBase extends AdminResource {
protected void internalListPackages(String type, String tenant, String namespace, AsyncResponse asyncResponse) {
try {
PackageType packageType = PackageType.getEnum(type);
- getPackagesManagement().list(packageType, tenant, namespace)
+ checkPermissions(tenant, namespace)
+ .thenCompose(ignore -> getPackagesManagement().list(packageType, tenant, namespace))
.thenAccept(asyncResponse::resume)
.exceptionally(e -> handleError(e.getCause(), asyncResponse));
} catch (IllegalArgumentException iae) {
asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, iae.getMessage()));
}
}
+
+ private CompletableFuture<Void> checkPermissions(String tenant, String namespace) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if (config().isAuthenticationEnabled()) {
+ NamespaceName namespaceName;
+ try {
+ namespaceName = NamespaceName.get(tenant, namespace);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ getAuthorizationService()
+ .allowNamespaceOperationAsync(namespaceName, NamespaceOperation.PACKAGES, originalPrincipal(),
+ clientAppId(), clientAuthData())
+ .whenComplete((hasPermission, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ if (hasPermission) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(new RestException(Response.Status.UNAUTHORIZED, String.format(
+ "Role %s has not the 'package' permission to do the packages operations.", clientAppId())));
+ }
+ });
+ } else {
+ future.complete(null);
+ }
+ return future;
+ }
+
+ private AuthorizationService getAuthorizationService() {
+ if (authorizationService == null) {
+ authorizationService = pulsar().getBrokerService().getAuthorizationService();
+ return authorizationService;
+ }
+ return authorizationService;
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
index 646ca03..9a1b36c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
@@ -36,4 +36,7 @@ public enum AuthAction {
/** Permissions for sinks ops. **/
sinks,
+
+ /** Permissions for packages ops. **/
+ packages
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
index bda93c4..ceef164 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
@@ -38,4 +38,6 @@ public enum NamespaceOperation {
CLEAR_BACKLOG,
UNSUBSCRIBE,
+
+ PACKAGES,
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java
new file mode 100644
index 0000000..aeb7089
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.admin;
+
+import com.google.common.io.Files;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.elasticsearch.common.collect.Set;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+
+/**
+ * PackagesOpsWithAuthTest will test all package operations with and without the proper permission.
+ */
+@Slf4j
+public class PackagesOpsWithAuthTest {
+
+ private static final String CLUSTER_PREFIX = "package-auth";
+ private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
+ private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";
+
+ private static final String SUPER_USER_ROLE = "super-user";
+ private String superUserAuthToken;
+ private static final String PROXY_ROLE = "proxy";
+ private String proxyAuthToken;
+ private static final String REGULAR_USER_ROLE = "client";
+ private String clientAuthToken;
+ private File publicKeyFile;
+
+ private PulsarCluster pulsarCluster;
+ private PulsarContainer cmdContainer;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ // Before starting the cluster, generate the secret key and the token
+ // Use Zk container to have 1 container available before starting the cluster
+ final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
+ final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
+ cmdContainer = new ZKContainer<>(cliContainerName);
+ cmdContainer
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases(ZKContainer.NAME)
+ .withEnv("zkServers", ZKContainer.NAME);
+ cmdContainer.start();
+
+ createKeysAndTokens(cmdContainer);
+
+ PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .numBookies(2)
+ .numBrokers(2)
+ .numProxies(1)
+ .clusterName(clusterName)
+ .brokerEnvs(getBrokerSettingsEnvs())
+ .proxyEnvs(getProxySettingsEnvs())
+ .brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+ .proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+ .build();
+
+ pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+ }
+
+ @AfterClass
+ public void teardown() {
+ if (cmdContainer != null) {
+ cmdContainer.stop();
+ }
+ if (pulsarCluster != null) {
+ pulsarCluster.stop();
+ }
+ }
+
+ private Map<String, String> getBrokerSettingsEnvs() {
+ Map<String, String> envs = new HashMap<>();
+ envs.put("enablePackagesManagement", "true");
+ envs.put("authenticationEnabled", "true");
+ envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+ envs.put("authorizationEnabled", "true");
+ envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
+ envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+ envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
+ envs.put("authenticationRefreshCheckSeconds", "1");
+ envs.put("authenticateOriginalAuthData", "true");
+ envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+ return envs;
+ }
+
+ private Map<String, String> getProxySettingsEnvs() {
+ Map<String, String> envs = new HashMap<>();
+ envs.put("authenticationEnabled", "true");
+ envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+ envs.put("authorizationEnabled", "true");
+ envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+ envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
+ envs.put("authenticationRefreshCheckSeconds", "1");
+ envs.put("forwardAuthorizationCredentials", "true");
+ envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+ return envs;
+ }
+
+ protected void createKeysAndTokens(PulsarContainer container) throws Exception {
+ container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
+ "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+ byte[] publicKeyBytes = DockerUtils
+ .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
+ "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+ .getStdout();
+
+ publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
+ Files.write(publicKeyBytes, publicKeyFile);
+
+ clientAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--subject", REGULAR_USER_ROLE)
+ .getStdout().trim();
+ log.info("Created client token: {}", clientAuthToken);
+
+ superUserAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--subject", SUPER_USER_ROLE)
+ .getStdout().trim();
+ log.info("Created super-user token: {}", superUserAuthToken);
+
+ proxyAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--subject", PROXY_ROLE)
+ .getStdout().trim();
+ log.info("Created proxy token: {}", proxyAuthToken);
+ }
+
+ @Test
+ public void testPackagesOps() throws Exception {
+ @Cleanup
+ PulsarAdmin superUserAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+ .authentication(AuthenticationFactory.token(superUserAuthToken))
+ .build();
+
+ @Cleanup
+ PulsarAdmin clientAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+ .authentication(AuthenticationFactory.token(clientAuthToken))
+ .build();
+
+ // do some operation without grant any permissions
+ try {
+ List<String> packagesName = clientAdmin.packages().listPackages("function", "public/default");
+ fail("list package operation should fail because the client hasn't permission to do");
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 401);
+ }
+
+ // grant package permission to the role
+ superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ REGULAR_USER_ROLE, Set.of(AuthAction.packages));
+
+ // then do some package operations again, it should success
+ List<String> packagesName = clientAdmin.packages().listPackages("function", "public/default");
+ assertEquals(packagesName.size(), 0);
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 8d835dd..8d5404c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -137,6 +137,9 @@ public class PulsarCluster {
if (spec.proxyEnvs != null) {
spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
}
+ if (spec.proxyMountFiles != null) {
+ spec.proxyMountFiles.forEach(this.proxyContainer::withFileSystemBind);
+ }
// create bookies
bookieContainers.putAll(
@@ -174,6 +177,9 @@ public class PulsarCluster {
if (spec.brokerEnvs != null) {
brokerContainer.withEnv(spec.brokerEnvs);
}
+ if (spec.brokerMountFiles != null) {
+ spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind);
+ }
return brokerContainer;
}
));
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index b0c49bd..9dcfcfb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -142,4 +142,14 @@ public class PulsarClusterSpec {
* Specify envs for broker.
*/
Map<String, String> brokerEnvs;
+
+ /**
+ * Specify mount files.
+ */
+ Map<String, String> proxyMountFiles;
+
+ /**
+ * Specify mount files.
+ */
+ Map<String, String> brokerMountFiles;
}
diff --git a/tests/integration/src/test/resources/pulsar-auth.xml b/tests/integration/src/test/resources/pulsar-auth.xml
new file mode 100644
index 0000000..d361c57
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-auth.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar Auth Integration Tests" verbose="2" annotations="JDK">
+ <test name="pulsar-auth-test-suite" preserve-order="true" >
+ <classes>
+ <class name="org.apache.pulsar.tests.integration.auth.admin.PackagesOpsWithAuthTest"/>
+ </classes>
+ </test>
+</suite>