You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2023/03/24 02:16:59 UTC
[rocketmq] branch develop updated: Add rpc validatation for gRPC in PlainAccessResource (#6460)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8c4568569 Add rpc validatation for gRPC in PlainAccessResource (#6460)
8c4568569 is described below
commit 8c4568569cf073caed4ba4c7667bdc0e4d53d6c6
Author: Zhouxiang Zhan <zh...@apache.org>
AuthorDate: Fri Mar 24 10:16:52 2023 +0800
Add rpc validatation for gRPC in PlainAccessResource (#6460)
* including NotifyClientTerminationRequest, QueryRouteRequest, QueryAssignmentRequest, ChangeInvisibleDurationRequest
---
.../rocketmq/acl/plain/PlainAccessResource.java | 31 ++++++++++++++++++++--
1 file changed, 29 insertions(+), 2 deletions(-)
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 046a7d954..cdbd9ea9b 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -17,10 +17,15 @@
package org.apache.rocketmq.acl.plain;
import apache.rocketmq.v2.AckMessageRequest;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
+import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.EndTransactionRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.Message;
+import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.QueryAssignmentRequest;
+import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.SendMessageRequest;
@@ -213,8 +218,13 @@ public class PlainAccessResource implements AccessResource {
String rpcFullName = messageV3.getDescriptorForType().getFullName();
if (HeartbeatRequest.getDescriptor().getFullName().equals(rpcFullName)) {
HeartbeatRequest request = (HeartbeatRequest) messageV3;
- if (request.hasGroup()) {
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ if (ClientType.PUSH_CONSUMER.equals(request.getClientType())
+ || ClientType.SIMPLE_CONSUMER.equals(request.getClientType())) {
+ if (!request.hasGroup()) {
+ throw new AclException("Consumer heartbeat doesn't have group");
+ } else {
+ accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ }
}
} else if (SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
SendMessageRequest request = (SendMessageRequest) messageV3;
@@ -259,7 +269,24 @@ public class PlainAccessResource implements AccessResource {
accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB);
}
}
+ if (!command.getSettings().hasPublishing() && !command.getSettings().hasSubscription()) {
+ throw new AclException("settings command doesn't have publishing or subscription");
+ }
}
+ } else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ } else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ QueryRouteRequest request = (QueryRouteRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY);
+ } else if (QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ QueryAssignmentRequest request = (QueryAssignmentRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
+ } else if (ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);