You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/20 03:50:14 UTC
[rocketmq] 02/03: [ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a4a53706329f8b26650978bcd6319dbffd7bb4ed
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Wed Apr 20 20:01:21 2022 +0800
[ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent
---
.../rocketmq/acl/common/AclClientRPCHook.java | 54 ++--------
.../rocketmq/acl/common/AclClientRPCHookTest.java | 118 +++++++++++++++++++++
2 files changed, 128 insertions(+), 44 deletions(-)
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
index 9e5bf1fb5..d4452a3f2 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
@@ -16,11 +16,9 @@
*/
package org.apache.rocketmq.acl.common;
-import java.lang.reflect.Field;
+import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -30,8 +28,6 @@ import static org.apache.rocketmq.acl.common.SessionCredentials.SIGNATURE;
public class AclClientRPCHook implements RPCHook {
private final SessionCredentials sessionCredentials;
- protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
- new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
public AclClientRPCHook(SessionCredentials sessionCredentials) {
this.sessionCredentials = sessionCredentials;
@@ -39,16 +35,15 @@ public class AclClientRPCHook implements RPCHook {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
- byte[] total = AclUtils.combineRequestContent(request,
- parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
- String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
- request.addExtField(SIGNATURE, signature);
+ // Add AccessKey and SecurityToken into signature calculating.
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
-
- // The SecurityToken value is unneccessary,user can choose this one.
+ // The SecurityToken value is unnecessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
+ byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request));
+ String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
+ request.addExtField(SIGNATURE, signature);
}
@Override
@@ -56,40 +51,11 @@ public class AclClientRPCHook implements RPCHook {
}
- protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {
- CommandCustomHeader header = request.readCustomHeader();
+ protected SortedMap<String, String> parseRequestContent(RemotingCommand request) {
+ request.makeCustomHeaderToNet();
+ Map<String, String> extFields = request.getExtFields();
// Sort property
- SortedMap<String, String> map = new TreeMap<String, String>();
- map.put(ACCESS_KEY, ak);
- if (securityToken != null) {
- map.put(SECURITY_TOKEN, securityToken);
- }
- try {
- // Add header properties
- if (null != header) {
- Field[] fields = fieldCache.get(header.getClass());
- if (null == fields) {
- fields = header.getClass().getDeclaredFields();
- for (Field field : fields) {
- field.setAccessible(true);
- }
- Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
- if (null != tmp) {
- fields = tmp;
- }
- }
-
- for (Field field : fields) {
- Object value = field.get(header);
- if (null != value && !field.isSynthetic()) {
- map.put(field.getName(), value.toString());
- }
- }
- }
- return map;
- } catch (Exception e) {
- throw new RuntimeException("incompatible exception.", e);
- }
+ return new TreeMap<String, String>(extFields);
}
public SessionCredentials getSessionCredentials() {
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
new file mode 100644
index 000000000..8c0d57d62
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl.common;
+
+import java.lang.reflect.Field;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestType;
+import org.junit.Test;
+
+import static org.apache.rocketmq.acl.common.SessionCredentials.ACCESS_KEY;
+import static org.apache.rocketmq.acl.common.SessionCredentials.SECURITY_TOKEN;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AclClientRPCHookTest {
+ protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
+ new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
+ private AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(null);
+
+ @Test
+ public void testParseRequestContent() {
+ PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+ requestHeader.setConsumerGroup("group");
+ requestHeader.setTopic("topic");
+ requestHeader.setQueueId(1);
+ requestHeader.setQueueOffset(2L);
+ requestHeader.setMaxMsgNums(32);
+ requestHeader.setSysFlag(0);
+ requestHeader.setCommitOffset(0L);
+ requestHeader.setSuspendTimeoutMillis(15000L);
+ requestHeader.setSubVersion(0L);
+ requestHeader.setBrokerName("brokerName");
+ RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+ SortedMap<String, String> oldContent = oldVersionParseRequestContent(testPullRemotingCommand, "ak", null);
+ byte[] oldBytes = AclUtils.combineRequestContent(testPullRemotingCommand, oldContent);
+ testPullRemotingCommand.addExtField(ACCESS_KEY, "ak");
+ SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand);
+ byte[] newBytes = AclUtils.combineRequestContent(testPullRemotingCommand, content);
+ assertThat(newBytes).isEqualTo(oldBytes);
+ }
+
+ @Test
+ public void testParseRequestContentWithStreamRequestType() {
+ PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+ requestHeader.setConsumerGroup("group");
+ requestHeader.setTopic("topic");
+ requestHeader.setQueueId(1);
+ requestHeader.setQueueOffset(2L);
+ requestHeader.setMaxMsgNums(32);
+ requestHeader.setSysFlag(0);
+ requestHeader.setCommitOffset(0L);
+ requestHeader.setSuspendTimeoutMillis(15000L);
+ requestHeader.setSubVersion(0L);
+ requestHeader.setBrokerName("brokerName");
+ RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+ testPullRemotingCommand.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode()));
+ testPullRemotingCommand.addExtField(ACCESS_KEY, "ak");
+ SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand);
+ assertThat(content.get(MixAll.REQ_T)).isEqualTo(String.valueOf(RequestType.STREAM.getCode()));
+ }
+
+ private SortedMap<String, String> oldVersionParseRequestContent(RemotingCommand request, String ak, String securityToken) {
+ CommandCustomHeader header = request.readCustomHeader();
+ // Sort property
+ SortedMap<String, String> map = new TreeMap<String, String>();
+ map.put(ACCESS_KEY, ak);
+ if (securityToken != null) {
+ map.put(SECURITY_TOKEN, securityToken);
+ }
+ try {
+ // Add header properties
+ if (null != header) {
+ Field[] fields = fieldCache.get(header.getClass());
+ if (null == fields) {
+ fields = header.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ field.setAccessible(true);
+ }
+ Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
+ if (null != tmp) {
+ fields = tmp;
+ }
+ }
+
+ for (Field field : fields) {
+ Object value = field.get(header);
+ if (null != value && !field.isSynthetic()) {
+ map.put(field.getName(), value.toString());
+ }
+ }
+ }
+ return map;
+ } catch (Exception e) {
+ throw new RuntimeException("incompatible exception.", e);
+ }
+ }
+}
\ No newline at end of file