You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/01 10:37:56 UTC
[rocketmq] 05/11: 优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64%
This is an automated email from the ASF dual-hosted git repository.
huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 10428801df75187ba914e553dd341adf8a1ae6cb
Author: huangli <ar...@gmail.com>
AuthorDate: Fri Nov 5 17:48:40 2021 +0800
优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64%
---
.../processor/AbstractSendMessageProcessor.java | 13 ---
.../broker/processor/SendMessageProcessor.java | 4 +-
.../protocol/header/PullMessageRequestHeader.java | 17 +++
.../protocol/header/PullMessageResponseHeader.java | 10 ++
.../header/SendMessageRequestHeaderV2.java | 21 ++++
.../protocol/header/SendMessageResponseHeader.java | 10 ++
.../rocketmq/example/benchmark/BatchProducer.java | 3 +
.../rocketmq/example/benchmark/Consumer.java | 3 +
.../rocketmq/example/benchmark/Producer.java | 3 +
.../example/benchmark/TransactionProducer.java | 3 +
.../rocketmq/remoting/netty/NettyDecoder.java | 5 +-
.../rocketmq/remoting/netty/NettyEncoder.java | 3 +-
.../remoting/netty/NettyRemotingAbstract.java | 1 +
.../remoting/protocol/FastCodesHeader.java | 11 ++
.../remoting/protocol/RemotingCommand.java | 59 ++++++----
.../remoting/protocol/RocketMQSerializable.java | 119 ++++++++++++++-------
.../remoting/protocol/RemotingCommandTest.java | 15 ++-
.../protocol/RocketMQSerializableTest.java | 68 +++++++++++-
18 files changed, 284 insertions(+), 84 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 85cb705..1f0744e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -279,19 +279,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
this.sendMessageHookList = sendMessageHookList;
}
- protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
- final RemotingCommand response) {
- if (!request.isOnewayRPC()) {
- try {
- ctx.writeAndFlush(response);
- } catch (Throwable e) {
- log.error("SendMessageProcessor process request over, but response failed", e);
- log.error(request.toString());
- log.error(response.toString());
- }
- }
- }
-
public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
SendMessageContext context) {
if (hasSendMessageHook()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 994d596..0499695 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -545,8 +545,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
- doResponse(ctx, request, response);
-
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
@@ -561,7 +559,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
- return null;
+ return response;
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index adc32df..e351344 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -28,6 +28,8 @@ import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
+import io.netty.buffer.ByteBuf;
+
public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
@CFNotNull
private String consumerGroup;
@@ -56,6 +58,21 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
}
@Override
+ public void encode(ByteBuf out) {
+ writeIfNotNull(out, "consumerGroup", consumerGroup);
+ writeIfNotNull(out, "topic", topic);
+ writeIfNotNull(out, "queueId", queueId);
+ writeIfNotNull(out, "queueOffset", queueOffset);
+ writeIfNotNull(out, "maxMsgNums", maxMsgNums);
+ writeIfNotNull(out, "sysFlag", sysFlag);
+ writeIfNotNull(out, "commitOffset", commitOffset);
+ writeIfNotNull(out, "suspendTimeoutMillis", suspendTimeoutMillis);
+ writeIfNotNull(out, "subscription", subscription);
+ writeIfNotNull(out, "subVersion", subVersion);
+ writeIfNotNull(out, "expressionType", expressionType);
+ }
+
+ @Override
public void decode(HashMap<String, String> fields) throws RemotingCommandException {
String str = fields.get("consumerGroup");
checkNotNull(str, "the custom field <consumerGroup> is null");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index db7f24b..1ac5050 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
+import io.netty.buffer.ByteBuf;
+
public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
@CFNotNull
private Long suggestWhichBrokerId;
@@ -42,6 +44,14 @@ public class PullMessageResponseHeader implements CommandCustomHeader, FastCodes
}
@Override
+ public void encode(ByteBuf out) {
+ writeIfNotNull(out, "suggestWhichBrokerId", suggestWhichBrokerId);
+ writeIfNotNull(out, "nextBeginOffset", nextBeginOffset);
+ writeIfNotNull(out, "minOffset", minOffset);
+ writeIfNotNull(out, "maxOffset", maxOffset);
+ }
+
+ @Override
public void decode(HashMap<String, String> fields) throws RemotingCommandException {
String str = fields.get("suggestWhichBrokerId");
checkNotNull(str, "the custom field <suggestWhichBrokerId> is null");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index de26947..f0cd9e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -25,6 +25,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import io.netty.buffer.ByteBuf;
+
/**
* Use short variable name to speed up FastJson deserialization process.
*/
@@ -108,6 +110,25 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
}
@Override
+ public void encode(ByteBuf out) {
+ writeIfNotNull(out, "a", a);
+ writeIfNotNull(out, "b", b);
+ writeIfNotNull(out, "c", c);
+ writeIfNotNull(out, "d", d);
+ writeIfNotNull(out, "e", e);
+ writeIfNotNull(out, "f", f);
+ writeIfNotNull(out, "g", g);
+ writeIfNotNull(out, "h", h);
+ writeIfNotNull(out, "i", i);
+ writeIfNotNull(out, "j", j);
+ writeIfNotNull(out, "k", k);
+ writeIfNotNull(out, "l", l);
+ writeIfNotNull(out, "m", m);
+ writeIfNotNull(out, "n", n);
+ writeIfNotNull(out, "o", o);
+ }
+
+ @Override
public void decode(HashMap<String, String> fields) throws RemotingCommandException {
String str = fields.get("a");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 9d8786f..cc60e37 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
+import io.netty.buffer.ByteBuf;
+
public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
@CFNotNull
private String msgId;
@@ -41,6 +43,14 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes
}
@Override
+ public void encode(ByteBuf out) {
+ writeIfNotNull(out, "msgId", msgId);
+ writeIfNotNull(out, "queueId", queueId);
+ writeIfNotNull(out, "queueOffset", queueOffset);
+ writeIfNotNull(out, "transactionId", transactionId);
+ }
+
+ @Override
public void decode(HashMap<String, String> fields) throws RemotingCommandException {
String str = fields.get("msgId");
checkNotNull(str, "the custom field <msgId> is null");
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index cf207cd..eadb9c3 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -46,6 +46,8 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
import org.apache.rocketmq.srvutil.ServerUtil;
public class BatchProducer {
@@ -53,6 +55,7 @@ public class BatchProducer {
private static byte[] msgBody;
public static void main(String[] args) throws MQClientException {
+ System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser());
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index d08795d..c9e64f3 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.IOException;
@@ -49,6 +51,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class Consumer {
public static void main(String[] args) throws MQClientException, IOException {
+ System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index c32e00e..bdef16e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
import org.apache.rocketmq.srvutil.ServerUtil;
import java.util.Arrays;
@@ -50,6 +52,7 @@ public class Producer {
private static byte[] msgBody;
public static void main(String[] args) throws MQClientException {
+ System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 5e2f287..be5ccf2 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.UnsupportedEncodingException;
@@ -61,6 +63,7 @@ public class TransactionProducer {
static final int MAX_CHECK_RESULT_IN_MSG = 20;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
+ System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser());
TxSendConfig config = new TxSendConfig();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index f64ab2d..57ee601 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -44,10 +44,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
if (null == frame) {
return null;
}
-
- ByteBuffer byteBuffer = frame.nioBuffer();
-
- return RemotingCommand.decode(byteBuffer);
+ return RemotingCommand.decode(frame);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
index 4506e71..7463619 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
@@ -35,8 +35,7 @@ public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
- ByteBuffer header = remotingCommand.encodeHeader();
- out.writeBytes(header);
+ remotingCommand.fastEncodeHeader(out);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b2e7294..eaa2e0d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -209,6 +209,7 @@ public abstract class NettyRemotingAbstract {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
+ response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
index 4604ae1..f313da2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
@@ -21,6 +21,8 @@ import java.util.HashMap;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import io.netty.buffer.ByteBuf;
+
public interface FastCodesHeader {
default void checkNotNull(String s, String msg) throws RemotingCommandException {
if (s == null) {
@@ -28,6 +30,15 @@ public interface FastCodesHeader {
}
}
+ default void writeIfNotNull(ByteBuf out, String key, Object value) {
+ if (value != null) {
+ RocketMQSerializable.writeStr(out, true, key);
+ RocketMQSerializable.writeStr(out, false, value.toString());
+ }
+ }
+
+ public void encode(ByteBuf out);
+
void decode(HashMap<String, String> fields) throws RemotingCommandException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 912eea5..d469d10 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -31,6 +31,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
@@ -142,20 +145,21 @@ public class RemotingCommand {
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
- int length = byteBuffer.limit();
- int oriHeaderLen = byteBuffer.getInt();
- int headerLength = getHeaderLength(oriHeaderLen);
+ return decode(Unpooled.wrappedBuffer(byteBuffer));
+ }
- byte[] headerData = new byte[headerLength];
- byteBuffer.get(headerData);
+ public static RemotingCommand decode(final ByteBuf byteBuffer) {
+ int length = byteBuffer.readableBytes();
+ int oriHeaderLen = byteBuffer.readInt();
+ int headerLength = getHeaderLength(oriHeaderLen);
- RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
+ RemotingCommand cmd = headerDecode(byteBuffer, headerLength, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
- byteBuffer.get(bodyData);
+ byteBuffer.readBytes(bodyData);
}
cmd.body = bodyData;
@@ -166,14 +170,16 @@ public class RemotingCommand {
return length & 0xFFFFFF;
}
- private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+ private static RemotingCommand headerDecode(ByteBuf byteBuffer, int len, SerializeType type) {
switch (type) {
case JSON:
+ byte[] headerData = new byte[len];
+ byteBuffer.readBytes(headerData);
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
- RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
+ RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(byteBuffer);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
@@ -208,14 +214,8 @@ public class RemotingCommand {
return true;
}
- public static byte[] markProtocolType(int source, SerializeType type) {
- byte[] result = new byte[4];
-
- result[0] = type.getCode();
- result[1] = (byte) ((source >> 16) & 0xFF);
- result[2] = (byte) ((source >> 8) & 0xFF);
- result[3] = (byte) (source & 0xFF);
- return result;
+ public static int markProtocolType(int source, SerializeType type) {
+ return (type.getCode() << 24) | (source & 0x00FFFFFF);
}
public void markResponseType() {
@@ -349,7 +349,7 @@ public class RemotingCommand {
result.putInt(length);
// header length
- result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+ result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
@@ -401,6 +401,27 @@ public class RemotingCommand {
}
}
+ public void fastEncodeHeader(ByteBuf out) {
+ int bodySize = this.body != null ? this.body.length : 0;
+ int beginIndex = out.writerIndex();
+ // skip 8 bytes
+ out.writeLong(0);
+ int headerSize;
+ if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+ if (customHeader != null && !(customHeader instanceof FastCodesHeader)) {
+ this.makeCustomHeaderToNet();
+ }
+ headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, out);
+ } else {
+ this.makeCustomHeaderToNet();
+ byte[] header = RemotingSerializable.encode(this);
+ headerSize = header.length;
+ out.writeBytes(header);
+ }
+ out.setInt(beginIndex, 4 + headerSize + bodySize);
+ out.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC));
+ }
+
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
@@ -424,7 +445,7 @@ public class RemotingCommand {
result.putInt(length);
// header length
- result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+ result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
index 66119e0..ed8a28f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -18,12 +18,77 @@ package org.apache.rocketmq.remoting.protocol;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import io.netty.buffer.ByteBuf;
+
public class RocketMQSerializable {
- private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ private static final Charset CHARSET_UTF8 = StandardCharsets.UTF_8;
+
+ public static void writeStr(ByteBuf buf, boolean useShortLength, String str) {
+ int lenIndex = buf.writerIndex();
+ if (useShortLength) {
+ buf.writeShort(0);
+ } else {
+ buf.writeInt(0);
+ }
+ int len = buf.writeCharSequence(str, StandardCharsets.UTF_8);
+ if (useShortLength) {
+ buf.setShort(lenIndex, len);
+ } else {
+ buf.setInt(lenIndex, len);
+ }
+ }
+
+ public static String readStr(ByteBuf buf, boolean useShortLength) {
+ int len = useShortLength ? buf.readShort() : buf.readInt();
+ if (len == 0) {
+ return null;
+ }
+ CharSequence cs = buf.readCharSequence(len, StandardCharsets.UTF_8);
+ return cs == null ? null : cs.toString();
+ }
+
+ public static int rocketMQProtocolEncode(RemotingCommand cmd, ByteBuf out) {
+ int beginIndex = out.writerIndex();
+ // int code(~32767)
+ out.writeShort(cmd.getCode());
+ // LanguageCode language
+ out.writeByte(cmd.getLanguage().getCode());
+ // int version(~32767)
+ out.writeShort(cmd.getVersion());
+ // int opaque
+ out.writeInt(cmd.getOpaque());
+ // int flag
+ out.writeInt(cmd.getFlag());
+ // String remark
+ String remark = cmd.getRemark();
+ if (remark != null && !remark.isEmpty()) {
+ writeStr(out, false, remark);
+ } else {
+ out.writeInt(0);
+ }
+
+ int mapLenIndex = out.writerIndex();
+ out.writeInt(0);
+ if (cmd.readCustomHeader() instanceof FastCodesHeader) {
+ ((FastCodesHeader) cmd.readCustomHeader()).encode(out);
+ }
+ HashMap<String, String> map = cmd.getExtFields();
+ if (map != null && !map.isEmpty()) {
+ map.forEach((k, v) -> {
+ if (k != null && v != null) {
+ writeStr(out, true, k);
+ writeStr(out, false, v);
+ }
+ });
+ }
+ out.setInt(mapLenIndex, out.writerIndex() - mapLenIndex - 4);
+ return out.writerIndex() - beginIndex;
+ }
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
@@ -133,58 +198,38 @@ public class RocketMQSerializable {
return length;
}
- public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
+ public static RemotingCommand rocketMQProtocolDecode(final ByteBuf headerBuffer) {
RemotingCommand cmd = new RemotingCommand();
- ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
- cmd.setCode(headerBuffer.getShort());
+ cmd.setCode(headerBuffer.readShort());
// LanguageCode language
- cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
+ cmd.setLanguage(LanguageCode.valueOf(headerBuffer.readByte()));
// int version(~32767)
- cmd.setVersion(headerBuffer.getShort());
+ cmd.setVersion(headerBuffer.readShort());
// int opaque
- cmd.setOpaque(headerBuffer.getInt());
+ cmd.setOpaque(headerBuffer.readInt());
// int flag
- cmd.setFlag(headerBuffer.getInt());
+ cmd.setFlag(headerBuffer.readInt());
// String remark
- int remarkLength = headerBuffer.getInt();
- if (remarkLength > 0) {
- byte[] remarkContent = new byte[remarkLength];
- headerBuffer.get(remarkContent);
- cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
- }
+ cmd.setRemark(readStr(headerBuffer, false));
// HashMap<String, String> extFields
- int extFieldsLength = headerBuffer.getInt();
+ int extFieldsLength = headerBuffer.readInt();
if (extFieldsLength > 0) {
- byte[] extFieldsBytes = new byte[extFieldsLength];
- headerBuffer.get(extFieldsBytes);
- cmd.setExtFields(mapDeserialize(extFieldsBytes));
+ cmd.setExtFields(mapDeserialize(headerBuffer, extFieldsLength));
}
return cmd;
}
- public static HashMap<String, String> mapDeserialize(byte[] bytes) {
- if (bytes == null || bytes.length <= 0)
- return null;
-
- HashMap<String, String> map = new HashMap<String, String>();
- ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-
- short keySize;
- byte[] keyContent;
- int valSize;
- byte[] valContent;
- while (byteBuffer.hasRemaining()) {
- keySize = byteBuffer.getShort();
- keyContent = new byte[keySize];
- byteBuffer.get(keyContent);
+ public static HashMap<String, String> mapDeserialize(ByteBuf byteBuffer, int len) {
- valSize = byteBuffer.getInt();
- valContent = new byte[valSize];
- byteBuffer.get(valContent);
+ HashMap<String, String> map = new HashMap<>();
+ int endIndex = byteBuffer.readerIndex() + len;
- map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
+ while (byteBuffer.readerIndex() < endIndex) {
+ String k = readStr(byteBuffer, true);
+ String v = readStr(byteBuffer, false);
+ map.put(k, v);
}
return map;
}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 2bd41ce..a5d1993 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -31,7 +31,13 @@ public class RemotingCommandTest {
public void testMarkProtocolType_JSONProtocolType() {
int source = 261;
SerializeType type = SerializeType.JSON;
- byte[] result = RemotingCommand.markProtocolType(source, type);
+
+ byte[] result = new byte[4];
+ int x = RemotingCommand.markProtocolType(source, type);
+ result[0] = (byte) (x >> 24);
+ result[1] = (byte) (x >> 16);
+ result[2] = (byte) (x >> 8);
+ result[3] = (byte) x;
assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5});
}
@@ -39,7 +45,12 @@ public class RemotingCommandTest {
public void testMarkProtocolType_ROCKETMQProtocolType() {
int source = 16777215;
SerializeType type = SerializeType.ROCKETMQ;
- byte[] result = RemotingCommand.markProtocolType(source, type);
+ byte[] result = new byte[4];
+ int x = RemotingCommand.markProtocolType(source, type);
+ result[0] = (byte) (x >> 24);
+ result[1] = (byte) (x >> 16);
+ result[2] = (byte) (x >> 8);
+ result[3] = (byte) x;
assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1});
}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
index f1db54f..83e3cae 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
@@ -17,10 +17,17 @@
package org.apache.rocketmq.remoting.protocol;
import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
public class RocketMQSerializableTest {
@Test
public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithoutExtFields() {
@@ -42,7 +49,7 @@ public class RocketMQSerializableTest {
assertThat(parseToInt(result, 13)).isEqualTo(0); //empty remark
assertThat(parseToInt(result, 17)).isEqualTo(0); //empty extFields
- RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+ RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
assertThat(decodedCommand.getCode()).isEqualTo(code);
assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -80,7 +87,7 @@ public class RocketMQSerializableTest {
assertThat(parseToInt(result, 30)).isEqualTo(0); //empty extFields
- RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+ RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
assertThat(decodedCommand.getCode()).isEqualTo(code);
assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -115,10 +122,11 @@ public class RocketMQSerializableTest {
byte[] extFieldsArray = new byte[14];
System.arraycopy(result, 21, extFieldsArray, 0, 14);
- HashMap<String, String> extFields = RocketMQSerializable.mapDeserialize(extFieldsArray);
+ HashMap<String, String> extFields =
+ RocketMQSerializable.mapDeserialize(Unpooled.wrappedBuffer(extFieldsArray), extFieldsArray.length);
assertThat(extFields).contains(new HashMap.SimpleEntry("key", "value"));
- RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+ RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
assertThat(decodedCommand.getCode()).isEqualTo(code);
assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -150,4 +158,56 @@ public class RocketMQSerializableTest {
return array[index] * 16777216 + array[++index] * 65536 + array[++index] * 256
+ array[++index];
}
+
+ public static class MyHeader1 implements CommandCustomHeader {
+ private String str;
+ private int num;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public String getStr() {
+ return str;
+ }
+
+ public void setStr(String str) {
+ this.str = str;
+ }
+
+ public int getNum() {
+ return num;
+ }
+
+ public void setNum(int num) {
+ this.num = num;
+ }
+ }
+
+ @Test
+ public void testFastEncode() throws Exception {
+ ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16);
+ MyHeader1 header1 = new MyHeader1();
+ header1.setStr("s1");
+ header1.setNum(100);
+ RemotingCommand cmd = RemotingCommand.createRequestCommand(1, header1);
+ cmd.setRemark("remark");
+ cmd.setOpaque(1001);
+ cmd.setVersion(99);
+ cmd.setLanguage(LanguageCode.JAVA);
+ cmd.setFlag(3);
+ cmd.makeCustomHeaderToNet();
+ RocketMQSerializable.rocketMQProtocolEncode(cmd, buf);
+ RemotingCommand cmd2 = RocketMQSerializable.rocketMQProtocolDecode(buf);
+ assertThat(cmd2.getRemark()).isEqualTo("remark");
+ assertThat(cmd2.getCode()).isEqualTo(1);
+ assertThat(cmd2.getOpaque()).isEqualTo(1001);
+ assertThat(cmd2.getVersion()).isEqualTo(99);
+ assertThat(cmd2.getLanguage()).isEqualTo(LanguageCode.JAVA);
+ assertThat(cmd2.getFlag()).isEqualTo(3);
+
+ MyHeader1 h2 = (MyHeader1) cmd2.decodeCommandCustomHeader(MyHeader1.class);
+ assertThat(h2.getStr()).isEqualTo("s1");
+ assertThat(h2.getNum()).isEqualTo(100);
+ }
}
\ No newline at end of file