You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/20 03:50:14 UTC

[rocketmq] 02/03: [ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit a4a53706329f8b26650978bcd6319dbffd7bb4ed
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Wed Apr 20 20:01:21 2022 +0800

    [ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent
---
 .../rocketmq/acl/common/AclClientRPCHook.java      |  54 ++--------
 .../rocketmq/acl/common/AclClientRPCHookTest.java  | 118 +++++++++++++++++++++
 2 files changed, 128 insertions(+), 44 deletions(-)

diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
index 9e5bf1fb5..d4452a3f2 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
@@ -16,11 +16,9 @@
  */
 package org.apache.rocketmq.acl.common;
 
-import java.lang.reflect.Field;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -30,8 +28,6 @@ import static org.apache.rocketmq.acl.common.SessionCredentials.SIGNATURE;
 
 public class AclClientRPCHook implements RPCHook {
     private final SessionCredentials sessionCredentials;
-    protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
-        new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
 
     public AclClientRPCHook(SessionCredentials sessionCredentials) {
         this.sessionCredentials = sessionCredentials;
@@ -39,16 +35,15 @@ public class AclClientRPCHook implements RPCHook {
 
     @Override
     public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
-        byte[] total = AclUtils.combineRequestContent(request,
-            parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
-        String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
-        request.addExtField(SIGNATURE, signature);
+        // Add AccessKey and SecurityToken into signature calculating.
         request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
-        
-        // The SecurityToken value is unneccessary,user can choose this one.
+        // The SecurityToken value is unnecessary,user can choose this one.
         if (sessionCredentials.getSecurityToken() != null) {
             request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
         }
+        byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request));
+        String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
+        request.addExtField(SIGNATURE, signature);
     }
 
     @Override
@@ -56,40 +51,11 @@ public class AclClientRPCHook implements RPCHook {
 
     }
 
-    protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {
-        CommandCustomHeader header = request.readCustomHeader();
+    protected SortedMap<String, String> parseRequestContent(RemotingCommand request) {
+        request.makeCustomHeaderToNet();
+        Map<String, String> extFields = request.getExtFields();
         // Sort property
-        SortedMap<String, String> map = new TreeMap<String, String>();
-        map.put(ACCESS_KEY, ak);
-        if (securityToken != null) {
-            map.put(SECURITY_TOKEN, securityToken);
-        }
-        try {
-            // Add header properties
-            if (null != header) {
-                Field[] fields = fieldCache.get(header.getClass());
-                if (null == fields) {
-                    fields = header.getClass().getDeclaredFields();
-                    for (Field field : fields) {
-                        field.setAccessible(true);
-                    }
-                    Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
-                    if (null != tmp) {
-                        fields = tmp;
-                    }
-                }
-
-                for (Field field : fields) {
-                    Object value = field.get(header);
-                    if (null != value && !field.isSynthetic()) {
-                        map.put(field.getName(), value.toString());
-                    }
-                }
-            }
-            return map;
-        } catch (Exception e) {
-            throw new RuntimeException("incompatible exception.", e);
-        }
+        return new TreeMap<String, String>(extFields);
     }
 
     public SessionCredentials getSessionCredentials() {
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
new file mode 100644
index 000000000..8c0d57d62
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl.common;
+
+import java.lang.reflect.Field;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestType;
+import org.junit.Test;
+
+import static org.apache.rocketmq.acl.common.SessionCredentials.ACCESS_KEY;
+import static org.apache.rocketmq.acl.common.SessionCredentials.SECURITY_TOKEN;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AclClientRPCHookTest {
+    protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
+        new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
+    private AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(null);
+
+    @Test
+    public void testParseRequestContent() {
+        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+        requestHeader.setConsumerGroup("group");
+        requestHeader.setTopic("topic");
+        requestHeader.setQueueId(1);
+        requestHeader.setQueueOffset(2L);
+        requestHeader.setMaxMsgNums(32);
+        requestHeader.setSysFlag(0);
+        requestHeader.setCommitOffset(0L);
+        requestHeader.setSuspendTimeoutMillis(15000L);
+        requestHeader.setSubVersion(0L);
+        requestHeader.setBrokerName("brokerName");
+        RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+        SortedMap<String, String> oldContent = oldVersionParseRequestContent(testPullRemotingCommand, "ak", null);
+        byte[] oldBytes = AclUtils.combineRequestContent(testPullRemotingCommand, oldContent);
+        testPullRemotingCommand.addExtField(ACCESS_KEY, "ak");
+        SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand);
+        byte[] newBytes = AclUtils.combineRequestContent(testPullRemotingCommand, content);
+        assertThat(newBytes).isEqualTo(oldBytes);
+    }
+
+    @Test
+    public void testParseRequestContentWithStreamRequestType() {
+        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+        requestHeader.setConsumerGroup("group");
+        requestHeader.setTopic("topic");
+        requestHeader.setQueueId(1);
+        requestHeader.setQueueOffset(2L);
+        requestHeader.setMaxMsgNums(32);
+        requestHeader.setSysFlag(0);
+        requestHeader.setCommitOffset(0L);
+        requestHeader.setSuspendTimeoutMillis(15000L);
+        requestHeader.setSubVersion(0L);
+        requestHeader.setBrokerName("brokerName");
+        RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+        testPullRemotingCommand.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode()));
+        testPullRemotingCommand.addExtField(ACCESS_KEY, "ak");
+        SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand);
+        assertThat(content.get(MixAll.REQ_T)).isEqualTo(String.valueOf(RequestType.STREAM.getCode()));
+    }
+
+    private SortedMap<String, String> oldVersionParseRequestContent(RemotingCommand request, String ak, String securityToken) {
+        CommandCustomHeader header = request.readCustomHeader();
+        // Sort property
+        SortedMap<String, String> map = new TreeMap<String, String>();
+        map.put(ACCESS_KEY, ak);
+        if (securityToken != null) {
+            map.put(SECURITY_TOKEN, securityToken);
+        }
+        try {
+            // Add header properties
+            if (null != header) {
+                Field[] fields = fieldCache.get(header.getClass());
+                if (null == fields) {
+                    fields = header.getClass().getDeclaredFields();
+                    for (Field field : fields) {
+                        field.setAccessible(true);
+                    }
+                    Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
+                    if (null != tmp) {
+                        fields = tmp;
+                    }
+                }
+
+                for (Field field : fields) {
+                    Object value = field.get(header);
+                    if (null != value && !field.isSynthetic()) {
+                        map.put(field.getName(), value.toString());
+                    }
+                }
+            }
+            return map;
+        } catch (Exception e) {
+            throw new RuntimeException("incompatible exception.", e);
+        }
+    }
+}
\ No newline at end of file