You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/06 00:31:53 UTC

[pulsar] branch master updated: Fix authorization error if partition number of partitioned topic is updated. (#10300) (#10333)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c1d5162  Fix authorization error if partition number of partitioned topic is updated. (#10300) (#10333)
c1d5162 is described below

commit c1d516254f656e029d6daf6f4ca8914d20ff0d1b
Author: Shen Liu <li...@126.com>
AuthorDate: Thu May 6 08:31:08 2021 +0800

    Fix authorization error if partition number of partitioned topic is updated. (#10300) (#10333)
    
    Fixes #10300
    
    ### Motivation
    
    Fix the bug that after updating the partition number of  a partitioned topic, which has topic level auth policy, new producer/consumer of this topic will get error.
    
    ### Modifications
    
    In [`org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#checkPermission`](https://github.com/apache/pulsar/blob/889b9b8e5efc62d2d0cbc761205fba5759c97af0/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L394), if current `topicName` is a sub partition topic, also check the permissions of its partitioned topic.
---
 .../authorization/PulsarAuthorizationProvider.java | 21 ++++-
 .../server/ProxyWithJwtAuthorizationTest.java      | 91 ++++++++++++++++++++++
 2 files changed, 110 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index c7dd2f4..f0118b6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -434,16 +434,33 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
                             return;
                         }
                     }
+
+                    // If the partition number of the partitioned topic having topic level policy is updated,
+                    // the new sub partitions may not inherit the policy of the partition topic.
+                    // We can also check the permission of partitioned topic.
+                    // For https://github.com/apache/pulsar/issues/10300
+                    if (topicName.isPartitioned()) {
+                        topicRoles = policies.get().auth_policies.destination_auth.get(topicName.getPartitionedTopicName());
+                        if (topicRoles != null) {
+                            // Topic has custom policy
+                            Set<AuthAction> topicActions = topicRoles.get(role);
+                            if (topicActions != null && topicActions.contains(action)) {
+                                // The role has topic level permission
+                                permissionFuture.complete(true);
+                                return;
+                            }
+                        }
+                    }
                 }
                 permissionFuture.complete(false);
             }).exceptionally(ex -> {
-                log.warn("Client  with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
+                log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
                         ex.getMessage());
                 permissionFuture.completeExceptionally(ex);
                 return null;
             });
         } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
+            log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
                     e.getMessage());
             permissionFuture.completeExceptionally(e);
         }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index f683adf..4a7d0b8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -203,6 +203,97 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
 
     /**
      * <pre>
+     * 1. Create a 2-partition topic and grant produce/consume permission to client role.
+     * 2. Use producer/consumer with client role to process the topic, which is fine.
+     * 2. Update the topic partition number to 4.
+     * 3. Use new producer/consumer with client role to process the topic.
+     * 4. Broker should authorize producer/consumer normally.
+     * </pre>
+     */
+    @Test
+    public void testUpdatePartitionNumAndReconnect() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy();
+        createAdminClient();
+        PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
+
+        String clusterName = "proxy-authorization";
+        String namespaceName = "my-property/my-ns";
+        String topicName = "persistent://my-property/my-ns/my-topic1";
+        String subscriptionName = "my-subscriber-name";
+
+        admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString()));
+
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet(), Sets.newHashSet(clusterName)));
+        admin.namespaces().createNamespace(namespaceName);
+        admin.topics().createPartitionedTopic(topicName, 2);
+        admin.topics().grantPermission(topicName, CLIENT_ROLE,
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        Consumer<byte[]> consumer = proxyClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName).subscribe();
+
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
+                .topic(topicName).create();
+        final int MSG_NUM = 10;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < MSG_NUM; i++) {
+            String message = "my-message-" + i;
+            messageSet.add(message);
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg;
+        Set<String> receivedMessageSet = Sets.newHashSet();
+        for (int i = 0; i < MSG_NUM; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            receivedMessageSet.add(expectedMessage);
+            consumer.acknowledgeAsync(msg);
+        }
+        Assert.assertEquals(messageSet, receivedMessageSet);
+        consumer.close();
+        producer.close();
+
+        // update partition num
+        admin.topics().updatePartitionedTopic(topicName, 4);
+
+        // produce/consume the topic again
+        consumer = proxyClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName).subscribe();
+        producer = proxyClient.newProducer(Schema.BYTES)
+                .topic(topicName).create();
+
+        messageSet.clear();
+        for (int i = 0; i < MSG_NUM; i++) {
+            String message = "my-message-" + i;
+            messageSet.add(message);
+            producer.send(message.getBytes());
+        }
+
+        receivedMessageSet.clear();
+        for (int i = 0; i < MSG_NUM; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            receivedMessageSet.add(expectedMessage);
+            consumer.acknowledgeAsync(msg);
+        }
+        Assert.assertEquals(messageSet, receivedMessageSet);
+        consumer.close();
+        producer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * <pre>
      * It verifies jwt + Authentication + Authorization (client -> proxy -> broker).
      * It also test `SubscriptionAuthMode.Prefix` mode.
      *