You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/03/08 02:10:14 UTC
[pulsar] branch master updated: Instead of always using admin
access for topic, use read/write/admin access for topic (#6504)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36ea153 Instead of always using admin access for topic, use read/write/admin access for topic (#6504)
36ea153 is described below
commit 36ea153c0ff4fc3e3f04de4a37b658daa9f116fa
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Sat Mar 7 18:10:03 2020 -0800
Instead of always using admin access for topic, use read/write/admin access for topic (#6504)
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../broker/admin/impl/PersistentTopicsBase.java | 64 +++++++++++++++++-----
1 file changed, 51 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d74017c..685d195 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -234,6 +234,44 @@ public class PersistentTopicsBase extends AdminResource {
validateTopicOwnership(topicName, authoritative);
}
+ public void validateReadOperationOnTopic(boolean authoritative) {
+ validateTopicOwnership(topicName, authoritative);
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
+ }
+ validateAdminAccessForSubscriber("");
+ }
+ }
+
+ public void validateWriteOperationOnTopic(boolean authoritative) {
+ validateTopicOwnership(topicName, authoritative);
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
+ }
+ try {
+ if (!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, clientAppId(),
+ clientAuthData())) {
+ log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId());
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Subscriber %s is not authorized to access this operation", clientAppId()));
+ }
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception ex) {
+ // unknown error marked as internal server error
+ log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName,
+ clientAppId(), e.getMessage(), ex);
+ throw new RestException(ex);
+ }
+ }
+ }
+
protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
@@ -317,7 +355,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalDeleteTopicForcefully(boolean authoritative) {
- validateAdminOperationOnTopic(authoritative);
+ validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
topic.deleteForcefully().get();
@@ -391,7 +429,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalCreateNonPartitionedTopic(boolean authoritative) {
- validateAdminAccessForTenant(topicName.getTenant());
+ validateWriteOperationOnTopic(authoritative);
validateNonPartitionTopicName(topicName.getLocalName());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
@@ -426,7 +464,7 @@ public class PersistentTopicsBase extends AdminResource {
* @param numPartitions
*/
protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) {
- validateAdminAccessForTenant(topicName.getTenant());
+ validateWriteOperationOnTopic(false);
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
@@ -540,7 +578,7 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
try {
- validateAdminAccessForTenant(topicName.getTenant());
+ validateWriteOperationOnTopic(authoritative);
} catch (Exception e) {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -738,7 +776,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalDeleteTopic(boolean authoritative) {
- validateAdminOperationOnTopic(authoritative);
+ validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
// v2 topics have a global name so check if the topic is replicated.
@@ -825,7 +863,7 @@ public class PersistentTopicsBase extends AdminResource {
private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
try {
- validateAdminOperationOnTopic(authoritative);
+ validateReadOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
@@ -1279,7 +1317,7 @@ public class PersistentTopicsBase extends AdminResource {
// validate ownership and redirect if current broker is not owner
PersistentTopic topic;
try {
- validateAdminOperationOnTopic(authoritative);
+ validateWriteOperationOnTopic(authoritative);
topic = (PersistentTopic) getTopicReference(topicName);
} catch (Exception e) {
@@ -1744,7 +1782,7 @@ public class PersistentTopicsBase extends AdminResource {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
}
- validateAdminOperationOnTopic(authoritative);
+ validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
return ((PersistentTopic) topic).terminate().get();
@@ -1867,7 +1905,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalTriggerCompaction(boolean authoritative) {
- validateAdminOperationOnTopic(authoritative);
+ validateWriteOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
@@ -1880,13 +1918,13 @@ public class PersistentTopicsBase extends AdminResource {
}
protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
- validateAdminOperationOnTopic(authoritative);
+ validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.compactionStatus();
}
protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
- validateAdminOperationOnTopic(authoritative);
+ validateWriteOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
topic.triggerOffload(messageId);
@@ -1899,7 +1937,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
- validateAdminOperationOnTopic(authoritative);
+ validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.offloadStatus();
}
@@ -2237,7 +2275,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected MessageId internalGetLastMessageId(boolean authoritative) {
- validateAdminOperationOnTopic(authoritative);
+ validateReadOperationOnTopic(authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);