You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/20 03:50:12 UTC

[rocketmq] branch develop updated (fe5c39cdb -> f34c185e0)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


    from fe5c39cdb Generate legal JSON response conditionally (#4473)
     new 34eb2ce7d [ISSUE #3906] Mark stream-related request by RequestType
     new a4a537063 [ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent
     new f34c185e0 [ISSUE #3906] Add unit test

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../rocketmq/acl/common/AclClientRPCHook.java      |  54 ++--------
 .../rocketmq/acl/common/AclClientRPCHookTest.java  | 116 +++++++++++++++++++++
 .../org/apache/rocketmq/client/ClientConfig.java   |  25 ++++-
 .../client/consumer/DefaultLitePullConsumer.java   |   1 +
 .../client/consumer/DefaultMQPullConsumer.java     |   1 +
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   7 +-
 .../java/org/apache/rocketmq/common/MixAll.java    |   1 +
 .../rocketmq/common/rpchook/StreamTypeRPCHook.java |  17 +--
 .../{SerializeType.java => RequestType.java}       |  17 ++-
 ...{LanguageCodeTest.java => RequestTypeTest.java} |  18 ++--
 10 files changed, 187 insertions(+), 70 deletions(-)
 create mode 100644 acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
 copy remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java => common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java (61%)
 copy remoting/src/main/java/org/apache/rocketmq/remoting/protocol/{SerializeType.java => RequestType.java} (74%)
 copy remoting/src/test/java/org/apache/rocketmq/remoting/protocol/{LanguageCodeTest.java => RequestTypeTest.java} (75%)


[rocketmq] 03/03: [ISSUE #3906] Add unit test

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f34c185e05f5b1a1cc372fa72718d6b40ed6e5cc
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Jun 7 20:14:47 2022 +0800

    [ISSUE #3906] Add unit test
---
 .../rocketmq/acl/common/AclClientRPCHookTest.java  |  2 --
 .../remoting/protocol/RequestTypeTest.java         | 33 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
index 8c0d57d62..1dd94d8a1 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
@@ -50,7 +50,6 @@ public class AclClientRPCHookTest {
         requestHeader.setCommitOffset(0L);
         requestHeader.setSuspendTimeoutMillis(15000L);
         requestHeader.setSubVersion(0L);
-        requestHeader.setBrokerName("brokerName");
         RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
         SortedMap<String, String> oldContent = oldVersionParseRequestContent(testPullRemotingCommand, "ak", null);
         byte[] oldBytes = AclUtils.combineRequestContent(testPullRemotingCommand, oldContent);
@@ -72,7 +71,6 @@ public class AclClientRPCHookTest {
         requestHeader.setCommitOffset(0L);
         requestHeader.setSuspendTimeoutMillis(15000L);
         requestHeader.setSubVersion(0L);
-        requestHeader.setBrokerName("brokerName");
         RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
         testPullRemotingCommand.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode()));
         testPullRemotingCommand.addExtField(ACCESS_KEY, "ak");
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestTypeTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestTypeTest.java
new file mode 100644
index 000000000..7457926d7
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestTypeTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RequestTypeTest {
+    @Test
+    public void testValueOf() {
+        RequestType requestType = RequestType.valueOf(RequestType.STREAM.getCode());
+        assertThat(requestType).isEqualTo(RequestType.STREAM);
+
+        requestType = RequestType.valueOf((byte) 1);
+        assertThat(requestType).isNull();
+    }
+}
\ No newline at end of file


[rocketmq] 01/03: [ISSUE #3906] Mark stream-related request by RequestType

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 34eb2ce7d17caefea63132a4e30bc425594abddd
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Apr 19 11:16:31 2022 +0800

    [ISSUE #3906] Mark stream-related request by RequestType
---
 .../org/apache/rocketmq/client/ClientConfig.java   | 25 ++++++++++++-
 .../client/consumer/DefaultLitePullConsumer.java   |  1 +
 .../client/consumer/DefaultMQPullConsumer.java     |  1 +
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  7 +++-
 .../java/org/apache/rocketmq/common/MixAll.java    |  1 +
 .../rocketmq/common/rpchook/StreamTypeRPCHook.java | 34 ++++++++++++++++++
 .../rocketmq/remoting/protocol/RequestType.java    | 41 ++++++++++++++++++++++
 7 files changed, 108 insertions(+), 2 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 4452bbdfa..eeb882673 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.utils.NameServerAddressUtils;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RequestType;
 
 /**
  * Client Common configuration
@@ -65,6 +66,12 @@ public class ClientConfig {
 
     private LanguageCode language = LanguageCode.JAVA;
 
+    /**
+     * Enable stream request type will inject a RPCHook to add corresponding request type to remoting layer.
+     * And it will also generate a different client id to prevent unexpected reuses of MQClientInstance.
+     */
+    protected boolean enableStreamRequestType = false;
+
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
         sb.append(this.getClientIP());
@@ -76,6 +83,11 @@ public class ClientConfig {
             sb.append(this.unitName);
         }
 
+        if (enableStreamRequestType) {
+            sb.append("@");
+            sb.append(RequestType.STREAM);
+        }
+
         return sb.toString();
     }
 
@@ -160,6 +172,7 @@ public class ClientConfig {
         this.namespace = cc.namespace;
         this.language = cc.language;
         this.mqClientApiTimeout = cc.mqClientApiTimeout;
+        this.enableStreamRequestType = cc.enableStreamRequestType;
     }
 
     public ClientConfig cloneClientConfig() {
@@ -179,6 +192,7 @@ public class ClientConfig {
         cc.namespace = namespace;
         cc.language = language;
         cc.mqClientApiTimeout = mqClientApiTimeout;
+        cc.enableStreamRequestType = enableStreamRequestType;
         return cc;
     }
 
@@ -318,12 +332,21 @@ public class ClientConfig {
         this.mqClientApiTimeout = mqClientApiTimeout;
     }
 
+    public boolean isEnableStreamRequestType() {
+        return enableStreamRequestType;
+    }
+
+    public void setEnableStreamRequestType(boolean enableStreamRequestType) {
+        this.enableStreamRequestType = enableStreamRequestType;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
             + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]";
+            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout
+            + ", enableStreamRequestType=" + enableStreamRequestType + "]";
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 8c7f0f0b3..7799166f2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -220,6 +220,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.consumerGroup = consumerGroup;
+        this.enableStreamRequestType = true;
         defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 4784e72e1..5e2138e81 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -117,6 +117,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
     public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.consumerGroup = consumerGroup;
+        this.enableStreamRequestType = true;
         defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 245195f07..5d7e1685e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import com.alibaba.fastjson.JSON;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -152,6 +153,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerR
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -169,7 +171,6 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-import com.alibaba.fastjson.JSON;
 
 public class MQClientAPIImpl {
 
@@ -195,6 +196,10 @@ public class MQClientAPIImpl {
         this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
         this.clientRemotingProcessor = clientRemotingProcessor;
 
+        // Inject stream rpc hook first to make reserve field signature
+        if (clientConfig.isEnableStreamRequestType()) {
+            this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
+        }
         this.remotingClient.registerRPCHook(rpcHook);
         this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index cf28f2e22..b13a09f92 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -89,6 +89,7 @@ public class MixAll {
     public static final String REPLY_MESSAGE_FLAG = "reply";
     public static final String LMQ_PREFIX = "%LMQ%";
     public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
+    public static final String REQ_T = "ReqT";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     public static String getWSAddr() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java
new file mode 100644
index 000000000..7a74bc52f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.rpchook;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestType;
+
+public class StreamTypeRPCHook implements RPCHook {
+    @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+        request.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode()));
+    }
+
+    @Override public void doAfterResponse(String remoteAddr, RemotingCommand request,
+        RemotingCommand response) {
+
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java
new file mode 100644
index 000000000..65217d5b8
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+public enum RequestType {
+    STREAM((byte) 0);
+
+    private final byte code;
+
+    RequestType(byte code) {
+        this.code = code;
+    }
+
+    public static RequestType valueOf(byte code) {
+        for (RequestType requestType : RequestType.values()) {
+            if (requestType.getCode() == code) {
+                return requestType;
+            }
+        }
+        return null;
+    }
+
+    public byte getCode() {
+        return code;
+    }
+}


[rocketmq] 02/03: [ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit a4a53706329f8b26650978bcd6319dbffd7bb4ed
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Wed Apr 20 20:01:21 2022 +0800

    [ISSUE #3906] Add extFields to AclClientRPCHook.parseRequestContent
---
 .../rocketmq/acl/common/AclClientRPCHook.java      |  54 ++--------
 .../rocketmq/acl/common/AclClientRPCHookTest.java  | 118 +++++++++++++++++++++
 2 files changed, 128 insertions(+), 44 deletions(-)

diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
index 9e5bf1fb5..d4452a3f2 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
@@ -16,11 +16,9 @@
  */
 package org.apache.rocketmq.acl.common;
 
-import java.lang.reflect.Field;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -30,8 +28,6 @@ import static org.apache.rocketmq.acl.common.SessionCredentials.SIGNATURE;
 
 public class AclClientRPCHook implements RPCHook {
     private final SessionCredentials sessionCredentials;
-    protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
-        new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
 
     public AclClientRPCHook(SessionCredentials sessionCredentials) {
         this.sessionCredentials = sessionCredentials;
@@ -39,16 +35,15 @@ public class AclClientRPCHook implements RPCHook {
 
     @Override
     public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
-        byte[] total = AclUtils.combineRequestContent(request,
-            parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
-        String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
-        request.addExtField(SIGNATURE, signature);
+        // Add AccessKey and SecurityToken into signature calculating.
         request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
-        
-        // The SecurityToken value is unneccessary,user can choose this one.
+        // The SecurityToken value is unnecessary,user can choose this one.
         if (sessionCredentials.getSecurityToken() != null) {
             request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
         }
+        byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request));
+        String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
+        request.addExtField(SIGNATURE, signature);
     }
 
     @Override
@@ -56,40 +51,11 @@ public class AclClientRPCHook implements RPCHook {
 
     }
 
-    protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {
-        CommandCustomHeader header = request.readCustomHeader();
+    protected SortedMap<String, String> parseRequestContent(RemotingCommand request) {
+        request.makeCustomHeaderToNet();
+        Map<String, String> extFields = request.getExtFields();
         // Sort property
-        SortedMap<String, String> map = new TreeMap<String, String>();
-        map.put(ACCESS_KEY, ak);
-        if (securityToken != null) {
-            map.put(SECURITY_TOKEN, securityToken);
-        }
-        try {
-            // Add header properties
-            if (null != header) {
-                Field[] fields = fieldCache.get(header.getClass());
-                if (null == fields) {
-                    fields = header.getClass().getDeclaredFields();
-                    for (Field field : fields) {
-                        field.setAccessible(true);
-                    }
-                    Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
-                    if (null != tmp) {
-                        fields = tmp;
-                    }
-                }
-
-                for (Field field : fields) {
-                    Object value = field.get(header);
-                    if (null != value && !field.isSynthetic()) {
-                        map.put(field.getName(), value.toString());
-                    }
-                }
-            }
-            return map;
-        } catch (Exception e) {
-            throw new RuntimeException("incompatible exception.", e);
-        }
+        return new TreeMap<String, String>(extFields);
     }
 
     public SessionCredentials getSessionCredentials() {
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
new file mode 100644
index 000000000..8c0d57d62
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclClientRPCHookTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl.common;
+
+import java.lang.reflect.Field;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestType;
+import org.junit.Test;
+
+import static org.apache.rocketmq.acl.common.SessionCredentials.ACCESS_KEY;
+import static org.apache.rocketmq.acl.common.SessionCredentials.SECURITY_TOKEN;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AclClientRPCHookTest {
+    protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
+        new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
+    private AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(null);
+
+    @Test
+    public void testParseRequestContent() {
+        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+        requestHeader.setConsumerGroup("group");
+        requestHeader.setTopic("topic");
+        requestHeader.setQueueId(1);
+        requestHeader.setQueueOffset(2L);
+        requestHeader.setMaxMsgNums(32);
+        requestHeader.setSysFlag(0);
+        requestHeader.setCommitOffset(0L);
+        requestHeader.setSuspendTimeoutMillis(15000L);
+        requestHeader.setSubVersion(0L);
+        requestHeader.setBrokerName("brokerName");
+        RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+        SortedMap<String, String> oldContent = oldVersionParseRequestContent(testPullRemotingCommand, "ak", null);
+        byte[] oldBytes = AclUtils.combineRequestContent(testPullRemotingCommand, oldContent);
+        testPullRemotingCommand.addExtField(ACCESS_KEY, "ak");
+        SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand);
+        byte[] newBytes = AclUtils.combineRequestContent(testPullRemotingCommand, content);
+        assertThat(newBytes).isEqualTo(oldBytes);
+    }
+
+    @Test
+    public void testParseRequestContentWithStreamRequestType() {
+        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+        requestHeader.setConsumerGroup("group");
+        requestHeader.setTopic("topic");
+        requestHeader.setQueueId(1);
+        requestHeader.setQueueOffset(2L);
+        requestHeader.setMaxMsgNums(32);
+        requestHeader.setSysFlag(0);
+        requestHeader.setCommitOffset(0L);
+        requestHeader.setSuspendTimeoutMillis(15000L);
+        requestHeader.setSubVersion(0L);
+        requestHeader.setBrokerName("brokerName");
+        RemotingCommand testPullRemotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+        testPullRemotingCommand.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode()));
+        testPullRemotingCommand.addExtField(ACCESS_KEY, "ak");
+        SortedMap<String, String> content = aclClientRPCHook.parseRequestContent(testPullRemotingCommand);
+        assertThat(content.get(MixAll.REQ_T)).isEqualTo(String.valueOf(RequestType.STREAM.getCode()));
+    }
+
+    private SortedMap<String, String> oldVersionParseRequestContent(RemotingCommand request, String ak, String securityToken) {
+        CommandCustomHeader header = request.readCustomHeader();
+        // Sort property
+        SortedMap<String, String> map = new TreeMap<String, String>();
+        map.put(ACCESS_KEY, ak);
+        if (securityToken != null) {
+            map.put(SECURITY_TOKEN, securityToken);
+        }
+        try {
+            // Add header properties
+            if (null != header) {
+                Field[] fields = fieldCache.get(header.getClass());
+                if (null == fields) {
+                    fields = header.getClass().getDeclaredFields();
+                    for (Field field : fields) {
+                        field.setAccessible(true);
+                    }
+                    Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
+                    if (null != tmp) {
+                        fields = tmp;
+                    }
+                }
+
+                for (Field field : fields) {
+                    Object value = field.get(header);
+                    if (null != value && !field.isSynthetic()) {
+                        map.put(field.getName(), value.toString());
+                    }
+                }
+            }
+            return map;
+        } catch (Exception e) {
+            throw new RuntimeException("incompatible exception.", e);
+        }
+    }
+}
\ No newline at end of file