You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/03/08 02:10:14 UTC

[pulsar] branch master updated: Instead of always using admin access for topic, use read/write/admin access for topic (#6504)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36ea153  Instead of always using admin access for topic, use read/write/admin access for topic (#6504)
36ea153 is described below

commit 36ea153c0ff4fc3e3f04de4a37b658daa9f116fa
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Sat Mar 7 18:10:03 2020 -0800

    Instead of always using admin access for topic, use read/write/admin access for topic (#6504)
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 64 +++++++++++++++++-----
 1 file changed, 51 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d74017c..685d195 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -234,6 +234,44 @@ public class PersistentTopicsBase extends AdminResource {
         validateTopicOwnership(topicName, authoritative);
     }
 
+    public void validateReadOperationOnTopic(boolean authoritative) {
+        validateTopicOwnership(topicName, authoritative);
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
+            }
+            validateAdminAccessForSubscriber("");
+        }
+    }
+
+    public void validateWriteOperationOnTopic(boolean authoritative) {
+        validateTopicOwnership(topicName, authoritative);
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
+            }
+            try {
+                if (!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, clientAppId(),
+                  clientAuthData())) {
+                    log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId());
+                    throw new RestException(Status.UNAUTHORIZED,
+                      String.format("Subscriber %s is not authorized to access this operation", clientAppId()));
+                }
+            } catch (RestException re) {
+                throw re;
+            } catch (Exception ex) {
+                // unknown error marked as internal server error
+                log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName,
+                  clientAppId(), e.getMessage(), ex);
+                throw new RestException(ex);
+            }
+        }
+    }
+
     protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) {
         validateTopicOwnership(topicName, authoritative);
         try {
@@ -317,7 +355,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalDeleteTopicForcefully(boolean authoritative) {
-        validateAdminOperationOnTopic(authoritative);
+        validateWriteOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
         try {
             topic.deleteForcefully().get();
@@ -391,7 +429,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalCreateNonPartitionedTopic(boolean authoritative) {
-        validateAdminAccessForTenant(topicName.getTenant());
+        validateWriteOperationOnTopic(authoritative);
         validateNonPartitionTopicName(topicName.getLocalName());
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
@@ -426,7 +464,7 @@ public class PersistentTopicsBase extends AdminResource {
      * @param numPartitions
      */
     protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) {
-        validateAdminAccessForTenant(topicName.getTenant());
+        validateWriteOperationOnTopic(false);
         // Only do the validation if it's the first hop.
         if (!updateLocalTopicOnly) {
             validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
@@ -540,7 +578,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
         try {
-            validateAdminAccessForTenant(topicName.getTenant());
+            validateWriteOperationOnTopic(authoritative);
         } catch (Exception e) {
             log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -738,7 +776,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalDeleteTopic(boolean authoritative) {
-        validateAdminOperationOnTopic(authoritative);
+        validateWriteOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
 
         // v2 topics have a global name so check if the topic is replicated.
@@ -825,7 +863,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
         try {
-            validateAdminOperationOnTopic(authoritative);
+            validateReadOperationOnTopic(authoritative);
             Topic topic = getTopicReference(topicName);
             final List<String> subscriptions = Lists.newArrayList();
             topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
@@ -1279,7 +1317,7 @@ public class PersistentTopicsBase extends AdminResource {
         // validate ownership and redirect if current broker is not owner
         PersistentTopic topic;
         try {
-            validateAdminOperationOnTopic(authoritative);
+            validateWriteOperationOnTopic(authoritative);
 
             topic = (PersistentTopic) getTopicReference(topicName);
         } catch (Exception e) {
@@ -1744,7 +1782,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
         }
-        validateAdminOperationOnTopic(authoritative);
+        validateWriteOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
         try {
             return ((PersistentTopic) topic).terminate().get();
@@ -1867,7 +1905,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalTriggerCompaction(boolean authoritative) {
-        validateAdminOperationOnTopic(authoritative);
+        validateWriteOperationOnTopic(authoritative);
 
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
@@ -1880,13 +1918,13 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
-        validateAdminOperationOnTopic(authoritative);
+        validateReadOperationOnTopic(authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         return topic.compactionStatus();
     }
 
     protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
-        validateAdminOperationOnTopic(authoritative);
+        validateWriteOperationOnTopic(authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
             topic.triggerOffload(messageId);
@@ -1899,7 +1937,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
-        validateAdminOperationOnTopic(authoritative);
+        validateReadOperationOnTopic(authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         return topic.offloadStatus();
     }
@@ -2237,7 +2275,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected MessageId internalGetLastMessageId(boolean authoritative) {
-        validateAdminOperationOnTopic(authoritative);
+        validateReadOperationOnTopic(authoritative);
 
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);