You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/06 00:31:53 UTC
[pulsar] branch master updated: Fix authorization error if
partition number of partitioned topic is updated. (#10300) (#10333)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1d5162 Fix authorization error if partition number of partitioned topic is updated. (#10300) (#10333)
c1d5162 is described below
commit c1d516254f656e029d6daf6f4ca8914d20ff0d1b
Author: Shen Liu <li...@126.com>
AuthorDate: Thu May 6 08:31:08 2021 +0800
Fix authorization error if partition number of partitioned topic is updated. (#10300) (#10333)
Fixes #10300
### Motivation
Fix the bug that after updating the partition number of a partitioned topic, which has topic level auth policy, new producer/consumer of this topic will get error.
### Modifications
In [`org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#checkPermission`](https://github.com/apache/pulsar/blob/889b9b8e5efc62d2d0cbc761205fba5759c97af0/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L394), if current `topicName` is a sub partition topic, also check the permissions of its partitioned topic.
---
.../authorization/PulsarAuthorizationProvider.java | 21 ++++-
.../server/ProxyWithJwtAuthorizationTest.java | 91 ++++++++++++++++++++++
2 files changed, 110 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index c7dd2f4..f0118b6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -434,16 +434,33 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
return;
}
}
+
+ // If the partition number of the partitioned topic having topic level policy is updated,
+ // the new sub partitions may not inherit the policy of the partition topic.
+ // We can also check the permission of partitioned topic.
+ // For https://github.com/apache/pulsar/issues/10300
+ if (topicName.isPartitioned()) {
+ topicRoles = policies.get().auth_policies.destination_auth.get(topicName.getPartitionedTopicName());
+ if (topicRoles != null) {
+ // Topic has custom policy
+ Set<AuthAction> topicActions = topicRoles.get(role);
+ if (topicActions != null && topicActions.contains(action)) {
+ // The role has topic level permission
+ permissionFuture.complete(true);
+ return;
+ }
+ }
+ }
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
- log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
+ log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
- log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
+ log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
e.getMessage());
permissionFuture.completeExceptionally(e);
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index f683adf..4a7d0b8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -203,6 +203,97 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
/**
* <pre>
+ * 1. Create a 2-partition topic and grant produce/consume permission to client role.
+ * 2. Use producer/consumer with client role to process the topic, which is fine.
+ * 2. Update the topic partition number to 4.
+ * 3. Use new producer/consumer with client role to process the topic.
+ * 4. Broker should authorize producer/consumer normally.
+ * </pre>
+ */
+ @Test
+ public void testUpdatePartitionNumAndReconnect() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ startProxy();
+ createAdminClient();
+ PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
+
+ String clusterName = "proxy-authorization";
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+ String subscriptionName = "my-subscriber-name";
+
+ admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString()));
+
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet(), Sets.newHashSet(clusterName)));
+ admin.namespaces().createNamespace(namespaceName);
+ admin.topics().createPartitionedTopic(topicName, 2);
+ admin.topics().grantPermission(topicName, CLIENT_ROLE,
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ Consumer<byte[]> consumer = proxyClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName).subscribe();
+
+ Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
+ .topic(topicName).create();
+ final int MSG_NUM = 10;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < MSG_NUM; i++) {
+ String message = "my-message-" + i;
+ messageSet.add(message);
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg;
+ Set<String> receivedMessageSet = Sets.newHashSet();
+ for (int i = 0; i < MSG_NUM; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ receivedMessageSet.add(expectedMessage);
+ consumer.acknowledgeAsync(msg);
+ }
+ Assert.assertEquals(messageSet, receivedMessageSet);
+ consumer.close();
+ producer.close();
+
+ // update partition num
+ admin.topics().updatePartitionedTopic(topicName, 4);
+
+ // produce/consume the topic again
+ consumer = proxyClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName).subscribe();
+ producer = proxyClient.newProducer(Schema.BYTES)
+ .topic(topicName).create();
+
+ messageSet.clear();
+ for (int i = 0; i < MSG_NUM; i++) {
+ String message = "my-message-" + i;
+ messageSet.add(message);
+ producer.send(message.getBytes());
+ }
+
+ receivedMessageSet.clear();
+ for (int i = 0; i < MSG_NUM; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ receivedMessageSet.add(expectedMessage);
+ consumer.acknowledgeAsync(msg);
+ }
+ Assert.assertEquals(messageSet, receivedMessageSet);
+ consumer.close();
+ producer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ /**
+ * <pre>
* It verifies jwt + Authentication + Authorization (client -> proxy -> broker).
* It also test `SubscriptionAuthMode.Prefix` mode.
*