You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/03 04:46:51 UTC
[dubbo] 02/03: Heartbeat skip serialize and deserialize (#7077)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 2.7.8.1-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 1373fe0401d22d203f398b94f00bf63213973663
Author: 张远征hd <ho...@163.com>
AuthorDate: Tue Feb 2 14:49:57 2021 +0800
Heartbeat skip serialize and deserialize (#7077)
---
.../remoting/exchange/codec/ExchangeCodec.java | 86 ++++++++++++++--------
.../dubbo/remoting/transport/CodecSupport.java | 64 ++++++++++++++++
.../dubbo/rpc/protocol/dubbo/DubboCodec.java | 23 ++++--
3 files changed, 137 insertions(+), 36 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 128ffe6..9367b80 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -38,6 +38,7 @@ import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
import org.apache.dubbo.remoting.transport.CodecSupport;
import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -148,19 +149,22 @@ public class ExchangeCodec extends TelnetCodec {
byte status = header[3];
res.setStatus(status);
try {
- ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
- if (res.isHeartbeat()) {
- data = decodeHeartbeatData(channel, in, is);
- } else if (res.isEvent()) {
- data = decodeEventData(channel, in, is);
+ if (res.isEvent()) {
+ byte[] eventPayload = CodecSupport.getPayload(is);
+ if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+ // heart beat response data is always null;
+ data = null;
+ } else {
+ data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+ }
} else {
- data = decodeResponseData(channel, in, getRequestData(id));
+ data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
}
res.setResult(data);
} else {
- res.setErrorMessage(in.readUTF());
+ res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
}
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
@@ -176,14 +180,17 @@ public class ExchangeCodec extends TelnetCodec {
req.setEvent(true);
}
try {
- ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
- if (req.isHeartbeat()) {
- data = decodeHeartbeatData(channel, in, is);
- } else if (req.isEvent()) {
- data = decodeEventData(channel, in, is);
+ if (req.isEvent()) {
+ byte[] eventPayload = CodecSupport.getPayload(is);
+ if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+ // heart beat response data is always null;
+ data = null;
+ } else {
+ data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+ }
} else {
- data = decodeRequestData(channel, in);
+ data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
}
req.setData(data);
} catch (Throwable t) {
@@ -231,16 +238,23 @@ public class ExchangeCodec extends TelnetCodec {
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
- ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
- if (req.isEvent()) {
- encodeEventData(channel, out, req.getData());
+
+ if (req.isHeartbeat()) {
+ // heartbeat request data is always null
+ bos.write(CodecSupport.getNullBytesOf(serialization));
} else {
- encodeRequestData(channel, out, req.getData(), req.getVersion());
- }
- out.flushBuffer();
- if (out instanceof Cleanable) {
- ((Cleanable) out).cleanup();
+ ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+ if (req.isEvent()) {
+ encodeEventData(channel, out, req.getData());
+ } else {
+ encodeRequestData(channel, out, req.getData(), req.getVersion());
+ }
+ out.flushBuffer();
+ if (out instanceof Cleanable) {
+ ((Cleanable) out).cleanup();
+ }
}
+
bos.flush();
bos.close();
int len = bos.writtenBytes();
@@ -274,21 +288,33 @@ public class ExchangeCodec extends TelnetCodec {
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
- ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+
// encode response data or error message.
if (status == Response.OK) {
- if (res.isHeartbeat()) {
- encodeEventData(channel, out, res.getResult());
- } else {
- encodeResponseData(channel, out, res.getResult(), res.getVersion());
+ if(res.isHeartbeat()){
+ // heartbeat response data is always null
+ bos.write(CodecSupport.getNullBytesOf(serialization));
+ }else {
+ ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+ if (res.isEvent()) {
+ encodeEventData(channel, out, res.getResult());
+ } else {
+ encodeResponseData(channel, out, res.getResult(), res.getVersion());
+ }
+ out.flushBuffer();
+ if (out instanceof Cleanable) {
+ ((Cleanable) out).cleanup();
+ }
}
} else {
+ ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
out.writeUTF(res.getErrorMessage());
+ out.flushBuffer();
+ if (out instanceof Cleanable) {
+ ((Cleanable) out).cleanup();
+ }
}
- out.flushBuffer();
- if (out instanceof Cleanable) {
- ((Cleanable) out).cleanup();
- }
+
bos.flush();
bos.close();
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index d8609b4..390b2e2 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -22,17 +22,25 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
import org.apache.dubbo.common.serialize.Serialization;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ServiceRepository;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ServiceRepository;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -42,6 +50,8 @@ public class CodecSupport {
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();
+ // Cache null object serialize results, for heartbeat request/response serialize use.
+ private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
static {
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
@@ -90,6 +100,60 @@ public class CodecSupport {
return s.deserialize(url, is);
}
+ /**
+ * Get the null object serialize result byte[] of Serialization from the cache,
+ * if not, generate it first.
+ *
+ * @param s Serialization Instances
+ * @return serialize result of null object
+ */
+ public static byte[] getNullBytesOf(Serialization s) {
+ return ID_NULLBYTES_MAP.computeIfAbsent(s.getContentTypeId(), k -> {
+ //Pre-generated Null object bytes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] nullBytes = new byte[0];
+ try {
+ ObjectOutput out = s.serialize(null, baos);
+ out.writeObject(null);
+ out.flushBuffer();
+ nullBytes = baos.toByteArray();
+ baos.close();
+ } catch (Exception e) {
+ logger.warn("Serialization extension " + s.getClass().getName() + " not support serializing null object, return an empty bytes instead.");
+ }
+ return nullBytes;
+ });
+ }
+
+ /**
+ * Read all payload to byte[]
+ *
+ * @param is
+ * @return
+ * @throws IOException
+ */
+ public static byte[] getPayload(InputStream is) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = is.read(buffer)) > -1) {
+ baos.write(buffer, 0, len);
+ }
+ baos.flush();
+ return baos.toByteArray();
+ }
+
+ /**
+ * Check if payload is null object serialize result byte[] of serialization
+ *
+ * @param payload
+ * @param proto
+ * @return
+ */
+ public static boolean isHeartBeat(byte[] payload, byte proto) {
+ return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)));
+ }
+
public static void checkSerialization(String path, String version, Byte id) throws IOException {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ProviderModel providerModel = repository.lookupExportedServiceWithoutGroup(path + ":" + version);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 3cfad81..cb9acad 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -28,13 +28,12 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
-import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
-import org.apache.dubbo.remoting.transport.CodecSupport;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcInvocation;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -81,8 +80,14 @@ public class DubboCodec extends ExchangeCodec {
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
- ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
- data = decodeEventData(channel, in, is);
+ byte[] eventPayload = CodecSupport.getPayload(is);
+ if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+ // heart beat response data is always null;
+ data = null;
+ } else {
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+ data = decodeEventData(channel, in);
+ }
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
@@ -120,8 +125,14 @@ public class DubboCodec extends ExchangeCodec {
try {
Object data;
if (req.isEvent()) {
- ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
- data = decodeEventData(channel, in, is);
+ byte[] eventPayload = CodecSupport.getPayload(is);
+ if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+ // heart beat response data is always null;
+ data = null;
+ } else {
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+ data = decodeEventData(channel, in);
+ }
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {