You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/01 10:37:56 UTC

[rocketmq] 05/11: 优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64%

This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 10428801df75187ba914e553dd341adf8a1ae6cb
Author: huangli <ar...@gmail.com>
AuthorDate: Fri Nov 5 17:48:40 2021 +0800

    优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64%
---
 .../processor/AbstractSendMessageProcessor.java    |  13 ---
 .../broker/processor/SendMessageProcessor.java     |   4 +-
 .../protocol/header/PullMessageRequestHeader.java  |  17 +++
 .../protocol/header/PullMessageResponseHeader.java |  10 ++
 .../header/SendMessageRequestHeaderV2.java         |  21 ++++
 .../protocol/header/SendMessageResponseHeader.java |  10 ++
 .../rocketmq/example/benchmark/BatchProducer.java  |   3 +
 .../rocketmq/example/benchmark/Consumer.java       |   3 +
 .../rocketmq/example/benchmark/Producer.java       |   3 +
 .../example/benchmark/TransactionProducer.java     |   3 +
 .../rocketmq/remoting/netty/NettyDecoder.java      |   5 +-
 .../rocketmq/remoting/netty/NettyEncoder.java      |   3 +-
 .../remoting/netty/NettyRemotingAbstract.java      |   1 +
 .../remoting/protocol/FastCodesHeader.java         |  11 ++
 .../remoting/protocol/RemotingCommand.java         |  59 ++++++----
 .../remoting/protocol/RocketMQSerializable.java    | 119 ++++++++++++++-------
 .../remoting/protocol/RemotingCommandTest.java     |  15 ++-
 .../protocol/RocketMQSerializableTest.java         |  68 +++++++++++-
 18 files changed, 284 insertions(+), 84 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 85cb705..1f0744e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -279,19 +279,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         this.sendMessageHookList = sendMessageHookList;
     }
 
-    protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
-        final RemotingCommand response) {
-        if (!request.isOnewayRPC()) {
-            try {
-                ctx.writeAndFlush(response);
-            } catch (Throwable e) {
-                log.error("SendMessageProcessor process request over, but response failed", e);
-                log.error(request.toString());
-                log.error(response.toString());
-            }
-        }
-    }
-
     public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
         SendMessageContext context) {
         if (hasSendMessageHook()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 994d596..0499695 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -545,8 +545,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             responseHeader.setQueueId(queueIdInt);
             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
 
-            doResponse(ctx, request, response);
-
             if (hasSendMessageHook()) {
                 sendMessageContext.setMsgId(responseHeader.getMsgId());
                 sendMessageContext.setQueueId(responseHeader.getQueueId());
@@ -561,7 +559,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 sendMessageContext.setCommercialSendSize(wroteSize);
                 sendMessageContext.setCommercialOwner(owner);
             }
-            return null;
+            return response;
         } else {
             if (hasSendMessageHook()) {
                 int wroteSize = request.getBody().length;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index adc32df..e351344 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -28,6 +28,8 @@ import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
+import io.netty.buffer.ByteBuf;
+
 public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String consumerGroup;
@@ -56,6 +58,21 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "consumerGroup", consumerGroup);
+        writeIfNotNull(out, "topic", topic);
+        writeIfNotNull(out, "queueId", queueId);
+        writeIfNotNull(out, "queueOffset", queueOffset);
+        writeIfNotNull(out, "maxMsgNums", maxMsgNums);
+        writeIfNotNull(out, "sysFlag", sysFlag);
+        writeIfNotNull(out, "commitOffset", commitOffset);
+        writeIfNotNull(out, "suspendTimeoutMillis", suspendTimeoutMillis);
+        writeIfNotNull(out, "subscription", subscription);
+        writeIfNotNull(out, "subVersion", subVersion);
+        writeIfNotNull(out, "expressionType", expressionType);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
         String str = fields.get("consumerGroup");
         checkNotNull(str, "the custom field <consumerGroup> is null");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index db7f24b..1ac5050 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
+import io.netty.buffer.ByteBuf;
+
 public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private Long suggestWhichBrokerId;
@@ -42,6 +44,14 @@ public class PullMessageResponseHeader implements CommandCustomHeader, FastCodes
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "suggestWhichBrokerId", suggestWhichBrokerId);
+        writeIfNotNull(out, "nextBeginOffset", nextBeginOffset);
+        writeIfNotNull(out, "minOffset", minOffset);
+        writeIfNotNull(out, "maxOffset", maxOffset);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
         String str = fields.get("suggestWhichBrokerId");
         checkNotNull(str, "the custom field <suggestWhichBrokerId> is null");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index de26947..f0cd9e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -25,6 +25,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * Use short variable name to speed up FastJson deserialization process.
  */
@@ -108,6 +110,25 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "a", a);
+        writeIfNotNull(out, "b", b);
+        writeIfNotNull(out, "c", c);
+        writeIfNotNull(out, "d", d);
+        writeIfNotNull(out, "e", e);
+        writeIfNotNull(out, "f", f);
+        writeIfNotNull(out, "g", g);
+        writeIfNotNull(out, "h", h);
+        writeIfNotNull(out, "i", i);
+        writeIfNotNull(out, "j", j);
+        writeIfNotNull(out, "k", k);
+        writeIfNotNull(out, "l", l);
+        writeIfNotNull(out, "m", m);
+        writeIfNotNull(out, "n", n);
+        writeIfNotNull(out, "o", o);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
 
         String str = fields.get("a");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 9d8786f..cc60e37 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
+import io.netty.buffer.ByteBuf;
+
 public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String msgId;
@@ -41,6 +43,14 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "msgId", msgId);
+        writeIfNotNull(out, "queueId", queueId);
+        writeIfNotNull(out, "queueOffset", queueOffset);
+        writeIfNotNull(out, "transactionId", transactionId);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
         String str = fields.get("msgId");
         checkNotNull(str, "the custom field <msgId> is null");
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index cf207cd..eadb9c3 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -46,6 +46,8 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 public class BatchProducer {
@@ -53,6 +55,7 @@ public class BatchProducer {
     private static byte[] msgBody;
 
     public static void main(String[] args) throws MQClientException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser());
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index d08795d..c9e64f3 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 import java.io.IOException;
@@ -49,6 +51,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Consumer {
 
     public static void main(String[] args) throws MQClientException, IOException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
         if (null == commandLine) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index c32e00e..bdef16e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 import java.util.Arrays;
@@ -50,6 +52,7 @@ public class Producer {
     private static byte[] msgBody;
 
     public static void main(String[] args) throws MQClientException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 5e2f287..be5ccf2 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 import java.io.UnsupportedEncodingException;
@@ -61,6 +63,7 @@ public class TransactionProducer {
     static final int MAX_CHECK_RESULT_IN_MSG = 20;
 
     public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser());
         TxSendConfig config = new TxSendConfig();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index f64ab2d..57ee601 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -44,10 +44,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
             if (null == frame) {
                 return null;
             }
-
-            ByteBuffer byteBuffer = frame.nioBuffer();
-
-            return RemotingCommand.decode(byteBuffer);
+            return RemotingCommand.decode(frame);
         } catch (Exception e) {
             log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
             RemotingUtil.closeChannel(ctx.channel());
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
index 4506e71..7463619 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
@@ -35,8 +35,7 @@ public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
     public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
         throws Exception {
         try {
-            ByteBuffer header = remotingCommand.encodeHeader();
-            out.writeBytes(header);
+            remotingCommand.fastEncodeHeader(out);
             byte[] body = remotingCommand.getBody();
             if (body != null) {
                 out.writeBytes(body);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b2e7294..eaa2e0d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -209,6 +209,7 @@ public abstract class NettyRemotingAbstract {
                                     if (response != null) {
                                         response.setOpaque(opaque);
                                         response.markResponseType();
+                                        response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
                                         try {
                                             ctx.writeAndFlush(response);
                                         } catch (Throwable e) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
index 4604ae1..f313da2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
@@ -21,6 +21,8 @@ import java.util.HashMap;
 
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
+import io.netty.buffer.ByteBuf;
+
 public interface FastCodesHeader {
     default void checkNotNull(String s, String msg) throws RemotingCommandException {
         if (s == null) {
@@ -28,6 +30,15 @@ public interface FastCodesHeader {
         }
     }
 
+    default void writeIfNotNull(ByteBuf out, String key, Object value) {
+        if (value != null) {
+            RocketMQSerializable.writeStr(out, true, key);
+            RocketMQSerializable.writeStr(out, false, value.toString());
+        }
+    }
+
+    public void encode(ByteBuf out);
+
     void decode(HashMap<String, String> fields) throws RemotingCommandException;
 
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 912eea5..d469d10 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -31,6 +31,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 public class RemotingCommand {
     public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
     public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
@@ -142,20 +145,21 @@ public class RemotingCommand {
     }
 
     public static RemotingCommand decode(final ByteBuffer byteBuffer) {
-        int length = byteBuffer.limit();
-        int oriHeaderLen = byteBuffer.getInt();
-        int headerLength = getHeaderLength(oriHeaderLen);
+        return decode(Unpooled.wrappedBuffer(byteBuffer));
+    }
 
-        byte[] headerData = new byte[headerLength];
-        byteBuffer.get(headerData);
+    public static RemotingCommand decode(final ByteBuf byteBuffer) {
+        int length = byteBuffer.readableBytes();
+        int oriHeaderLen = byteBuffer.readInt();
+        int headerLength = getHeaderLength(oriHeaderLen);
 
-        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
+        RemotingCommand cmd = headerDecode(byteBuffer, headerLength, getProtocolType(oriHeaderLen));
 
         int bodyLength = length - 4 - headerLength;
         byte[] bodyData = null;
         if (bodyLength > 0) {
             bodyData = new byte[bodyLength];
-            byteBuffer.get(bodyData);
+            byteBuffer.readBytes(bodyData);
         }
         cmd.body = bodyData;
 
@@ -166,14 +170,16 @@ public class RemotingCommand {
         return length & 0xFFFFFF;
     }
 
-    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+    private static RemotingCommand headerDecode(ByteBuf byteBuffer, int len, SerializeType type) {
         switch (type) {
             case JSON:
+                byte[] headerData = new byte[len];
+                byteBuffer.readBytes(headerData);
                 RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                 resultJson.setSerializeTypeCurrentRPC(type);
                 return resultJson;
             case ROCKETMQ:
-                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
+                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(byteBuffer);
                 resultRMQ.setSerializeTypeCurrentRPC(type);
                 return resultRMQ;
             default:
@@ -208,14 +214,8 @@ public class RemotingCommand {
         return true;
     }
 
-    public static byte[] markProtocolType(int source, SerializeType type) {
-        byte[] result = new byte[4];
-
-        result[0] = type.getCode();
-        result[1] = (byte) ((source >> 16) & 0xFF);
-        result[2] = (byte) ((source >> 8) & 0xFF);
-        result[3] = (byte) (source & 0xFF);
-        return result;
+    public static int markProtocolType(int source, SerializeType type) {
+        return (type.getCode() << 24) | (source & 0x00FFFFFF);
     }
 
     public void markResponseType() {
@@ -349,7 +349,7 @@ public class RemotingCommand {
         result.putInt(length);
 
         // header length
-        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+        result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC));
 
         // header data
         result.put(headerData);
@@ -401,6 +401,27 @@ public class RemotingCommand {
         }
     }
 
+    public void fastEncodeHeader(ByteBuf out) {
+        int bodySize = this.body != null ? this.body.length : 0;
+        int beginIndex = out.writerIndex();
+        // skip 8 bytes
+        out.writeLong(0);
+        int headerSize;
+        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+            if (customHeader != null && !(customHeader instanceof FastCodesHeader)) {
+                this.makeCustomHeaderToNet();
+            }
+            headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, out);
+        } else {
+            this.makeCustomHeaderToNet();
+            byte[] header = RemotingSerializable.encode(this);
+            headerSize = header.length;
+            out.writeBytes(header);
+        }
+        out.setInt(beginIndex, 4 + headerSize + bodySize);
+        out.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC));
+    }
+
     public ByteBuffer encodeHeader() {
         return encodeHeader(this.body != null ? this.body.length : 0);
     }
@@ -424,7 +445,7 @@ public class RemotingCommand {
         result.putInt(length);
 
         // header length
-        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+        result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC));
 
         // header data
         result.put(headerData);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
index 66119e0..ed8a28f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -18,12 +18,77 @@ package org.apache.rocketmq.remoting.protocol;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+
 public class RocketMQSerializable {
-    private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+    private static final Charset CHARSET_UTF8 = StandardCharsets.UTF_8;
+
+    public static void writeStr(ByteBuf buf, boolean useShortLength, String str) {
+        int lenIndex = buf.writerIndex();
+        if (useShortLength) {
+            buf.writeShort(0);
+        } else {
+            buf.writeInt(0);
+        }
+        int len = buf.writeCharSequence(str, StandardCharsets.UTF_8);
+        if (useShortLength) {
+            buf.setShort(lenIndex, len);
+        } else {
+            buf.setInt(lenIndex, len);
+        }
+    }
+
+    public static String readStr(ByteBuf buf, boolean useShortLength) {
+        int len = useShortLength ? buf.readShort() : buf.readInt();
+        if (len == 0) {
+            return null;
+        }
+        CharSequence cs = buf.readCharSequence(len, StandardCharsets.UTF_8);
+        return cs == null ? null : cs.toString();
+    }
+
+    public static int rocketMQProtocolEncode(RemotingCommand cmd, ByteBuf out) {
+        int beginIndex = out.writerIndex();
+        // int code(~32767)
+        out.writeShort(cmd.getCode());
+        // LanguageCode language
+        out.writeByte(cmd.getLanguage().getCode());
+        // int version(~32767)
+        out.writeShort(cmd.getVersion());
+        // int opaque
+        out.writeInt(cmd.getOpaque());
+        // int flag
+        out.writeInt(cmd.getFlag());
+        // String remark
+        String remark = cmd.getRemark();
+        if (remark != null && !remark.isEmpty()) {
+            writeStr(out, false, remark);
+        } else {
+            out.writeInt(0);
+        }
+
+        int mapLenIndex = out.writerIndex();
+        out.writeInt(0);
+        if (cmd.readCustomHeader() instanceof FastCodesHeader) {
+            ((FastCodesHeader) cmd.readCustomHeader()).encode(out);
+        }
+        HashMap<String, String> map = cmd.getExtFields();
+        if (map != null && !map.isEmpty()) {
+            map.forEach((k, v) -> {
+                if (k != null && v != null) {
+                    writeStr(out, true, k);
+                    writeStr(out, false, v);
+                }
+            });
+        }
+        out.setInt(mapLenIndex, out.writerIndex() - mapLenIndex - 4);
+        return out.writerIndex() - beginIndex;
+    }
 
     public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
         // String remark
@@ -133,58 +198,38 @@ public class RocketMQSerializable {
         return length;
     }
 
-    public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
+    public static RemotingCommand rocketMQProtocolDecode(final ByteBuf headerBuffer) {
         RemotingCommand cmd = new RemotingCommand();
-        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
         // int code(~32767)
-        cmd.setCode(headerBuffer.getShort());
+        cmd.setCode(headerBuffer.readShort());
         // LanguageCode language
-        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
+        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.readByte()));
         // int version(~32767)
-        cmd.setVersion(headerBuffer.getShort());
+        cmd.setVersion(headerBuffer.readShort());
         // int opaque
-        cmd.setOpaque(headerBuffer.getInt());
+        cmd.setOpaque(headerBuffer.readInt());
         // int flag
-        cmd.setFlag(headerBuffer.getInt());
+        cmd.setFlag(headerBuffer.readInt());
         // String remark
-        int remarkLength = headerBuffer.getInt();
-        if (remarkLength > 0) {
-            byte[] remarkContent = new byte[remarkLength];
-            headerBuffer.get(remarkContent);
-            cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
-        }
+        cmd.setRemark(readStr(headerBuffer, false));
 
         // HashMap<String, String> extFields
-        int extFieldsLength = headerBuffer.getInt();
+        int extFieldsLength = headerBuffer.readInt();
         if (extFieldsLength > 0) {
-            byte[] extFieldsBytes = new byte[extFieldsLength];
-            headerBuffer.get(extFieldsBytes);
-            cmd.setExtFields(mapDeserialize(extFieldsBytes));
+            cmd.setExtFields(mapDeserialize(headerBuffer, extFieldsLength));
         }
         return cmd;
     }
 
-    public static HashMap<String, String> mapDeserialize(byte[] bytes) {
-        if (bytes == null || bytes.length <= 0)
-            return null;
-
-        HashMap<String, String> map = new HashMap<String, String>();
-        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-
-        short keySize;
-        byte[] keyContent;
-        int valSize;
-        byte[] valContent;
-        while (byteBuffer.hasRemaining()) {
-            keySize = byteBuffer.getShort();
-            keyContent = new byte[keySize];
-            byteBuffer.get(keyContent);
+    public static HashMap<String, String> mapDeserialize(ByteBuf byteBuffer, int len) {
 
-            valSize = byteBuffer.getInt();
-            valContent = new byte[valSize];
-            byteBuffer.get(valContent);
+        HashMap<String, String> map = new HashMap<>();
+        int endIndex = byteBuffer.readerIndex() + len;
 
-            map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
+        while (byteBuffer.readerIndex() < endIndex) {
+            String k = readStr(byteBuffer, true);
+            String v = readStr(byteBuffer, false);
+            map.put(k, v);
         }
         return map;
     }
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 2bd41ce..a5d1993 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -31,7 +31,13 @@ public class RemotingCommandTest {
     public void testMarkProtocolType_JSONProtocolType() {
         int source = 261;
         SerializeType type = SerializeType.JSON;
-        byte[] result = RemotingCommand.markProtocolType(source, type);
+
+        byte[] result = new byte[4];
+        int x = RemotingCommand.markProtocolType(source, type);
+        result[0] = (byte) (x >> 24);
+        result[1] = (byte) (x >> 16);
+        result[2] = (byte) (x >> 8);
+        result[3] = (byte) x;
         assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5});
     }
 
@@ -39,7 +45,12 @@ public class RemotingCommandTest {
     public void testMarkProtocolType_ROCKETMQProtocolType() {
         int source = 16777215;
         SerializeType type = SerializeType.ROCKETMQ;
-        byte[] result = RemotingCommand.markProtocolType(source, type);
+        byte[] result = new byte[4];
+        int x = RemotingCommand.markProtocolType(source, type);
+        result[0] = (byte) (x >> 24);
+        result[1] = (byte) (x >> 16);
+        result[2] = (byte) (x >> 8);
+        result[3] = (byte) x;
         assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1});
     }
 
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
index f1db54f..83e3cae 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
@@ -17,10 +17,17 @@
 package org.apache.rocketmq.remoting.protocol;
 
 import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 public class RocketMQSerializableTest {
     @Test
     public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithoutExtFields() {
@@ -42,7 +49,7 @@ public class RocketMQSerializableTest {
         assertThat(parseToInt(result, 13)).isEqualTo(0); //empty remark
         assertThat(parseToInt(result, 17)).isEqualTo(0); //empty extFields
 
-        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
 
         assertThat(decodedCommand.getCode()).isEqualTo(code);
         assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -80,7 +87,7 @@ public class RocketMQSerializableTest {
 
         assertThat(parseToInt(result, 30)).isEqualTo(0); //empty extFields
 
-        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
 
         assertThat(decodedCommand.getCode()).isEqualTo(code);
         assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -115,10 +122,11 @@ public class RocketMQSerializableTest {
 
         byte[] extFieldsArray = new byte[14];
         System.arraycopy(result, 21, extFieldsArray, 0, 14);
-        HashMap<String, String> extFields = RocketMQSerializable.mapDeserialize(extFieldsArray);
+        HashMap<String, String> extFields =
+                RocketMQSerializable.mapDeserialize(Unpooled.wrappedBuffer(extFieldsArray), extFieldsArray.length);
         assertThat(extFields).contains(new HashMap.SimpleEntry("key", "value"));
 
-        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
 
         assertThat(decodedCommand.getCode()).isEqualTo(code);
         assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -150,4 +158,56 @@ public class RocketMQSerializableTest {
         return array[index] * 16777216 + array[++index] * 65536 + array[++index] * 256
             + array[++index];
     }
+
+    public static class MyHeader1 implements CommandCustomHeader {
+        private String str;
+        private int num;
+
+        @Override
+        public void checkFields() throws RemotingCommandException {
+        }
+
+        public String getStr() {
+            return str;
+        }
+
+        public void setStr(String str) {
+            this.str = str;
+        }
+
+        public int getNum() {
+            return num;
+        }
+
+        public void setNum(int num) {
+            this.num = num;
+        }
+    }
+
+    @Test
+    public void testFastEncode() throws Exception {
+        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16);
+        MyHeader1 header1 = new MyHeader1();
+        header1.setStr("s1");
+        header1.setNum(100);
+        RemotingCommand cmd = RemotingCommand.createRequestCommand(1, header1);
+        cmd.setRemark("remark");
+        cmd.setOpaque(1001);
+        cmd.setVersion(99);
+        cmd.setLanguage(LanguageCode.JAVA);
+        cmd.setFlag(3);
+        cmd.makeCustomHeaderToNet();
+        RocketMQSerializable.rocketMQProtocolEncode(cmd, buf);
+        RemotingCommand cmd2 = RocketMQSerializable.rocketMQProtocolDecode(buf);
+        assertThat(cmd2.getRemark()).isEqualTo("remark");
+        assertThat(cmd2.getCode()).isEqualTo(1);
+        assertThat(cmd2.getOpaque()).isEqualTo(1001);
+        assertThat(cmd2.getVersion()).isEqualTo(99);
+        assertThat(cmd2.getLanguage()).isEqualTo(LanguageCode.JAVA);
+        assertThat(cmd2.getFlag()).isEqualTo(3);
+
+        MyHeader1 h2 = (MyHeader1) cmd2.decodeCommandCustomHeader(MyHeader1.class);
+        assertThat(h2.getStr()).isEqualTo("s1");
+        assertThat(h2.getNum()).isEqualTo(100);
+    }
 }
\ No newline at end of file