You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/12/01 02:44:59 UTC

[GitHub] [pulsar] zhaohaidao commented on a change in pull request #5767: Support batch authorization of partitioned topic

zhaohaidao commented on a change in pull request #5767: Support batch authorization of partitioned topic
URL: https://github.com/apache/pulsar/pull/5767#discussion_r352316028
 
 

 ##########
 File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 ##########
 @@ -312,4 +316,82 @@ public void testGetPartitionedTopicsList() throws KeeperException, InterruptedEx
         Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
         Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
     }
+
+    @Test
+    public void testGrantNonPartitionedTopic() {
+        final String topicName = "non-partitioned-topic";
+        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
+        String role = "role";
+        Set<AuthAction> expectActions = new HashSet<>();
+        expectActions.add(AuthAction.produce);
+        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
+        Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
+        Assert.assertEquals(permissions.get(role), expectActions);
+    }
+
+    @Test
+    public void testGrantPartitionedTopic() {
+        final String partitionedTopicName = "partitioned-topic";
+        final int numPartitions = 5;
+        LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
+        ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
+        doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
+        doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
 
 Review comment:
   Hi, I went through the grant and get logic for permissions and It seems current logic support get permissions for partitions of a topic. Pls help me check if my understanding is right.
   The permissions will be stored in a map named destination_auth as followed. 
   ```java
       private void grantPermissions(String topicUri, String role, Set<AuthAction> actions) {
           try {
               ...
               Policies policies = jsonMapper().readValue(content, Policies.class);
   
               if (!policies.auth_policies.destination_auth.containsKey(topicUri)) {
                   policies.auth_policies.destination_auth.put(topicUri, new TreeMap<String, Set<AuthAction>>());
               }
               policies.auth_policies.destination_auth.get(topicUri).put(role, actions);
   
               // Write the new policies to zookeeper
               globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policies),
                       nodeStat.getVersion());
           ...
           }
   ```
   Then get permissions logic for a partition: try to get permissions from  auth_policies.destination_auth directly by topic_name.
   ```java
   protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
           // This operation should be reading from zookeeper and it should be allowed without having admin privileges
           validateAdminAccessForTenant(namespaceName.getTenant());
   
           String topicUri = topicName.toString();
   
           try {
               ...
               // Then add topic level permissions
               if (auth.destination_auth.containsKey(topicUri)) {
                   for (Map.Entry<String, Set<AuthAction>> entry : auth.destination_auth.get(topicUri).entrySet()) {
                       String role = entry.getKey();
                       Set<AuthAction> topicPermissions = entry.getValue();
   
                       if (!permissions.containsKey(role)) {
                           permissions.put(role, topicPermissions);
                       } else {
                           // Do the union between namespace and topic level
                           Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
                           permissions.put(role, union);
                       }
                   }
               }
   
               return permissions;
           } catch (Exception e) {
               ...
           }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services