You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2023/03/24 02:16:59 UTC

[rocketmq] branch develop updated: Add rpc validatation for gRPC in PlainAccessResource (#6460)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8c4568569 Add rpc validatation for gRPC in PlainAccessResource (#6460)
8c4568569 is described below

commit 8c4568569cf073caed4ba4c7667bdc0e4d53d6c6
Author: Zhouxiang Zhan <zh...@apache.org>
AuthorDate: Fri Mar 24 10:16:52 2023 +0800

    Add rpc validatation for gRPC in PlainAccessResource (#6460)
    
    * including NotifyClientTerminationRequest, QueryRouteRequest, QueryAssignmentRequest, ChangeInvisibleDurationRequest
---
 .../rocketmq/acl/plain/PlainAccessResource.java    | 31 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 046a7d954..cdbd9ea9b 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -17,10 +17,15 @@
 package org.apache.rocketmq.acl.plain;
 
 import apache.rocketmq.v2.AckMessageRequest;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
+import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.EndTransactionRequest;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
 import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.Message;
+import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.QueryAssignmentRequest;
+import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Resource;
 import apache.rocketmq.v2.SendMessageRequest;
@@ -213,8 +218,13 @@ public class PlainAccessResource implements AccessResource {
             String rpcFullName = messageV3.getDescriptorForType().getFullName();
             if (HeartbeatRequest.getDescriptor().getFullName().equals(rpcFullName)) {
                 HeartbeatRequest request = (HeartbeatRequest) messageV3;
-                if (request.hasGroup()) {
-                    accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+                if (ClientType.PUSH_CONSUMER.equals(request.getClientType())
+                    || ClientType.SIMPLE_CONSUMER.equals(request.getClientType())) {
+                    if (!request.hasGroup()) {
+                        throw new AclException("Consumer heartbeat doesn't have group");
+                    } else {
+                        accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+                    }
                 }
             } else if (SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
                 SendMessageRequest request = (SendMessageRequest) messageV3;
@@ -259,7 +269,24 @@ public class PlainAccessResource implements AccessResource {
                             accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB);
                         }
                     }
+                    if (!command.getSettings().hasPublishing() && !command.getSettings().hasSubscription()) {
+                        throw new AclException("settings command doesn't have publishing or subscription");
+                    }
                 }
+            } else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+                NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3;
+                accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+            } else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+                QueryRouteRequest request = (QueryRouteRequest) messageV3;
+                accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY);
+            } else if (QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+                QueryAssignmentRequest request = (QueryAssignmentRequest) messageV3;
+                accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+                accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
+            } else if (ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+                ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) messageV3;
+                accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+                accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
             }
         } catch (Throwable t) {
             throw new AclException(t.getMessage(), t);