You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/15 10:47:00 UTC

[GitHub] massakam closed pull request #899: Add subscription auth mode by prefix

massakam closed pull request #899: Add subscription auth mode by prefix
URL: https://github.com/apache/incubator-pulsar/pull/899
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index e33e2a5a6..9fa31ccfd 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -22,8 +22,10 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -78,14 +80,52 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
      *            the fully qualified destination name associated with the destination.
      * @param role
      *            the app id used to receive messages from the destination.
+     * @param subscription
+     *            the subscription name defined by the client
      */
-    public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role) {
-        return checkAuthorization(destination, role, AuthAction.consume);
+    public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role, String subscription) {
+        CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
+        try {
+            configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
+                if (!policies.isPresent()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Policies node couldn't be found for destination : {}", destination);
+                    }
+                } else {
+                    if (isNotBlank(subscription)) {
+                        switch (policies.get().subscription_auth_mode) {
+                        case Prefix:
+                            if (!subscription.startsWith(role)) {
+                                PulsarServerException ex = new PulsarServerException(
+                                        String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination));
+                                permissionFuture.completeExceptionally(ex);
+                                return;
+                            }
+                            break;
+                        default:
+                            break;
+                        }
+                    }
+                }
+                checkAuthorization(destination, role, AuthAction.consume).thenAccept(isAuthorized -> {
+                    permissionFuture.complete(isAuthorized);
+                });
+            }).exceptionally(ex -> {
+                log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
+                        ex);
+                permissionFuture.completeExceptionally(ex);
+                return null;
+            });
+        } catch (Exception e) {
+            log.warn("Client  with Role - {} failed to get permissions for destination - {}", role, destination, e);
+            permissionFuture.completeExceptionally(e);
+        }
+        return permissionFuture;
     }
 
-    public boolean canConsume(DestinationName destination, String role) throws Exception {
+    public boolean canConsume(DestinationName destination, String role, String subscription) throws Exception {
         try {
-            return canConsumeAsync(destination, role).get(cacheTimeOutInSec, SECONDS);
+            return canConsumeAsync(destination, role, subscription).get(cacheTimeOutInSec, SECONDS);
         } catch (InterruptedException e) {
             log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
             throw e;
@@ -107,7 +147,7 @@ public boolean canConsume(DestinationName destination, String role) throws Excep
      * @throws Exception
      */
     public boolean canLookup(DestinationName destination, String role) throws Exception {
-        return canProduce(destination, role) || canConsume(destination, role);
+        return canProduce(destination, role) || canConsume(destination, role, null);
     }
 
     private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
index 5da4ca2da..cb8cf72a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
@@ -75,6 +75,7 @@
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
@@ -1443,6 +1444,49 @@ public void unsubscribeNamespaceBundle(@PathParam("property") String property, @
                 nsName.toString(), bundleRange);
     }
 
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/subscriptionAuthMode")
+    @ApiOperation(value = " Set a subscription auth mode for all the destinations on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void setSubscriptionAuthMode(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, SubscriptionAuthMode subscriptionAuthMode) {
+        validateAdminAccessOnProperty(property);
+        validatePoliciesReadOnlyAccess();
+
+        if (subscriptionAuthMode == null) {
+            subscriptionAuthMode = SubscriptionAuthMode.None;
+        }
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, property, cluster, namespace);
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            policies.subscription_auth_mode = subscriptionAuthMode;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
+            log.info("[{}] Successfully updated subscription auth mode: namespace={}/{}/{}, map={}", clientAppId(), property,
+                    cluster, namespace, jsonMapper().writeValueAsString(policies.backlog_quota_map));
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: does not exist", clientAppId(),
+                    property, cluster, namespace);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: concurrent modification",
+                    clientAppId(), property, cluster, namespace);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update subscription auth mode for namespace {}/{}/{}", clientAppId(), property,
+                    cluster, namespace, e);
+            throw new RestException(e);
+        }
+    }
+
     private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
         try {
             List<Topic> topicList = pulsar().getBrokerService()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 07b0a462c..64bd4243c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -458,7 +458,7 @@ public void checkPermissions() {
         DestinationName destination = DestinationName.get(subscription.getDestination());
         if (cnx.getBrokerService().getAuthorizationManager() != null) {
             try {
-                if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) {
+                if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId, subscription.getName())) {
                     return;
                 }
             } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5600d654b..df8e577c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -37,6 +37,7 @@
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -349,7 +350,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
         if (service.isAuthorizationEnabled()) {
             authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
                     DestinationName.get(subscribe.getTopic()),
-                    originalPrincipal != null ? originalPrincipal : authRole);
+                    originalPrincipal != null ? originalPrincipal : authRole,
+                    subscribe.getSubscription());
         } else {
             authorizationFuture = CompletableFuture.completedFuture(true);
         }
@@ -460,6 +462,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
             }
             return null;
+        }).exceptionally(ex -> {
+            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+            if (ex.getCause() instanceof PulsarServerException) {
+                log.info(msg);
+            } else {
+                log.warn(msg);
+            }
+            ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
+            return null;
         });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 499d4f8de..0f79e7509 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -19,16 +19,16 @@
 package org.apache.pulsar.broker.auth;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 
 import java.util.EnumSet;
 
 import org.apache.pulsar.broker.authorization.AuthorizationManager;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -85,8 +85,8 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false);
 
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false);
 
@@ -94,7 +94,7 @@ void simple() throws Exception {
         waitForChange();
 
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
 
         // test for wildcard
 
@@ -102,7 +102,7 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
 
@@ -112,7 +112,7 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
 
@@ -120,7 +120,7 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
 
@@ -130,7 +130,7 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
 
@@ -143,7 +143,7 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false);
@@ -156,7 +156,7 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false);
@@ -166,7 +166,7 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false);
@@ -179,12 +179,34 @@ void simple() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false);
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my"), false);
 
+        admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "my.*");
+        admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "*.my");
+
+        // tests for subscription auth mode
+        admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "*", EnumSet.of(AuthAction.consume));
+        admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.Prefix);
+        waitForChange();
+
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1"), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2"), true);
+        try {
+            assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "sub1"), false);
+            fail();
+        } catch (Exception e) {}
+        try {
+            assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "sub2"), false);
+            fail();
+        } catch (Exception e) {}
+
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "role1-sub1"), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "role2-sub2"), true);
+
         admin.namespaces().deleteNamespace("p1/c1/ns1");
         admin.properties().deleteProperty("p1");
         admin.clusters().deleteCluster("c1");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index a3b9487e3..7e7e9fc21 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1127,7 +1127,7 @@ public void testUnsupportedBatchMsgSubscribeCommand() throws Exception {
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
         AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
-        doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any());
+        doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1147,7 +1147,7 @@ public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
         AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
-        doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any());
+        doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 3049f94dd..a6eff5970 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -44,6 +44,7 @@
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index 5f267a8d8..c996c9f71 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -102,8 +102,8 @@ public void test() throws Exception {
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role"), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false);
 
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false);
 
@@ -111,7 +111,7 @@ public void test() throws Exception {
         waitForChange();
 
         assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
 
         admin.namespaces().deleteNamespace("p1/c1/ns1");
         admin.properties().deleteProperty("p1");
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index edd8c5ce7..ebe754c8e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -33,6 +33,7 @@
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 
 /**
  * Admin interface for namespaces management
@@ -835,4 +836,13 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle,
      *             Unexpected error
      */
     void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException;
+
+     /**
+     * Set the given subscription auth mode on all destinations on a namespace
+     *
+     * @param namespace
+     * @param subscriptionAuthMode
+     * @throws PulsarAdminException
+     */
+    void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 9f6467950..79aa162c4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -40,6 +40,7 @@
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 
 public class NamespacesImpl extends BaseResource implements Namespaces {
 
@@ -76,7 +77,7 @@ public NamespacesImpl(WebTarget web, Authentication auth) {
             NamespaceName ns = NamespaceName.get(namespace);
             return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
                     .path("destinations")).get(new GenericType<List<String>>() {
-                    });
+            });
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -148,8 +149,8 @@ public void deleteNamespaceBundle(String namespace, String bundleRange) throws P
             NamespaceName ns = NamespaceName.get(namespace);
             return request(
                     namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("permissions"))
-                            .get(new GenericType<Map<String, Set<AuthAction>>>() {
-                            });
+                    .get(new GenericType<Map<String, Set<AuthAction>>>() {
+                    });
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -184,8 +185,8 @@ public void revokePermissionsOnNamespace(String namespace, String role) throws P
             NamespaceName ns = NamespaceName.get(namespace);
             return request(
                     namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("replication"))
-                            .get(new GenericType<List<String>>() {
-                            });
+                    .get(new GenericType<List<String>>() {
+                    });
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -208,8 +209,8 @@ public int getNamespaceMessageTTL(String namespace) throws PulsarAdminException
             NamespaceName ns = NamespaceName.get(namespace);
             return request(
                     namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("messageTTL"))
-                            .get(new GenericType<Integer>() {
-                            });
+                    .get(new GenericType<Integer>() {
+                    });
 
         } catch (Exception e) {
             throw getApiException(e);
@@ -231,7 +232,8 @@ public void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws Pu
     public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
-            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("deduplication"))
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("deduplication"))
                     .post(Entity.entity(enableDeduplication, MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
@@ -244,7 +246,7 @@ public void setDeduplicationStatus(String namespace, boolean enableDeduplication
             NamespaceName ns = NamespaceName.get(namespace);
             return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
                     .path("backlogQuotaMap")).get(new GenericType<Map<BacklogQuotaType, BacklogQuota>>() {
-                    });
+            });
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -255,8 +257,8 @@ public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws
         try {
             NamespaceName ns = NamespaceName.get(namespace);
             request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
-                    .path("backlogQuota")).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON),
-                            ErrorData.class);
+                    .path("backlogQuota"))
+                    .post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -268,7 +270,7 @@ public void removeBacklogQuota(String namespace) throws PulsarAdminException {
             NamespaceName ns = NamespaceName.get(namespace);
             request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("backlogQuota")
                     .queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()))
-                            .delete(ErrorData.class);
+                    .delete(ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -291,7 +293,7 @@ public PersistencePolicies getPersistence(String namespace) throws PulsarAdminEx
             NamespaceName ns = NamespaceName.get(namespace);
             return request(
                     namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("persistence"))
-                            .get(PersistencePolicies.class);
+                    .get(PersistencePolicies.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -315,7 +317,7 @@ public RetentionPolicies getRetention(String namespace) throws PulsarAdminExcept
             NamespaceName ns = NamespaceName.get(namespace);
             return request(
                     namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("retention"))
-                            .get(RetentionPolicies.class);
+                    .get(RetentionPolicies.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -361,7 +363,7 @@ public void splitNamespaceBundle(String namespace, String bundle, boolean unload
             NamespaceName ns = NamespaceName.get(namespace);
             request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path(bundle)
                     .path("split").queryParam("unload", Boolean.toString(unloadSplitBundles)))
-                            .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+                    .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -371,7 +373,8 @@ public void splitNamespaceBundle(String namespace, String bundle, boolean unload
     public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
-            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("dispatchRate"))
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("dispatchRate"))
                     .post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
@@ -382,8 +385,8 @@ public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws
     public DispatchRate getDispatchRate(String namespace) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
-            return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("dispatchRate"))
-            .get(DispatchRate.class);
+            return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("dispatchRate")).get(DispatchRate.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -429,8 +432,8 @@ public void clearNamespaceBundleBacklogForSubscription(String namespace, String
         try {
             NamespaceName ns = NamespaceName.get(namespace);
             request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path(bundle)
-                    .path("clearBacklog").path(subscription)).post(Entity.entity("", MediaType.APPLICATION_JSON),
-                            ErrorData.class);
+                    .path("clearBacklog").path(subscription))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -453,8 +456,20 @@ public void unsubscribeNamespaceBundle(String namespace, String bundle, String s
         try {
             NamespaceName ns = NamespaceName.get(namespace);
             request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path(bundle)
-                    .path("unsubscribe").path(subscription)).post(Entity.entity("", MediaType.APPLICATION_JSON),
-                            ErrorData.class);
+                    .path("unsubscribe").path(subscription))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("subscriptionAuthMode"))
+                    .post(Entity.entity(subscriptionAuthMode, MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 9acf3bade..37d7306f9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -308,7 +309,7 @@ void run() throws PulsarAdminException {
 
         @Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}\n", required = true)
         private String bundle;
-        
+
         @Parameter(names = { "--unload",
                 "-u" }, description = "Unload newly split bundles after splitting old bundle", required = false)
         private boolean unload;
@@ -534,6 +535,21 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Set subscription auth mode on a namespace")
+    private class SetSubscriptionAuthMode extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-m", "--subscription-auth-mode" }, description = "subscription name", required = true)
+        private String mode;
+
+        @Override
+        void run() throws Exception {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setSubscriptionAuthMode(namespace, SubscriptionAuthMode.valueOf(mode));
+        }
+    }
+
     private static long validateSizeString(String s) {
         char last = s.charAt(s.length() - 1);
         String subStr = s.substring(0, s.length() - 1);
@@ -623,5 +639,6 @@ public CmdNamespaces(PulsarAdmin admin) {
         jcommander.addCommand("unsubscribe", new Unsubscribe());
 
         jcommander.addCommand("set-encryption-required", new SetEncryptionRequired());
+        jcommander.addCommand("set-subscription-auth-mode", new SetSubscriptionAuthMode());
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index e2b7b059f..a46b5fee0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -47,6 +47,7 @@
     public static final String LAST_BOUNDARY = "0xffffffff";
 
     public boolean encryption_required = false;
+    public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
 
     @Override
     public boolean equals(Object obj) {
@@ -61,7 +62,8 @@ public boolean equals(Object obj) {
                     && Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate)
                     && message_ttl_in_seconds == other.message_ttl_in_seconds
                     && Objects.equals(retention_policies, other.retention_policies)
-                    && Objects.equals(encryption_required, other.encryption_required);
+                    && Objects.equals(encryption_required, other.encryption_required)
+                    && Objects.equals(subscription_auth_mode, other.subscription_auth_mode);
         }
 
         return false;
@@ -86,7 +88,7 @@ public String toString() {
                 .add("latency_stats_sample_rate", latency_stats_sample_rate)
                 .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
                 .add("deleted", deleted)
-                .add("encryption_required", encryption_required).toString();
+                .add("encryption_required", encryption_required)
+                .add("subscription_auth_mode", subscription_auth_mode).toString();
     }
 }
-
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java
new file mode 100644
index 000000000..5a5714761
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Subscription authorization for Pulsar policies
+ */
+public enum SubscriptionAuthMode {
+    /** Every subscription name can be used by every role */
+    None,
+
+    /** Subscription name with auth role prefix can be used by the role */
+    Prefix,
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index c0ccf51b6..f6fd0eb63 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -279,7 +279,7 @@ private ConsumerConfiguration getConsumerConfiguration() {
 
     @Override
     protected Boolean isAuthorized(String authRole) throws Exception {
-        return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole);
+        return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription);
     }
 
     private static String extractSubscription(HttpServletRequest request) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index ddf23c571..eae771773 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -247,7 +247,7 @@ private ReaderConfiguration getReaderConfiguration() {
 
     @Override
     protected Boolean isAuthorized(String authRole) throws Exception {
-        return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole);
+        return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription);
     }
 
     private MessageId getMessageId() throws IOException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services