You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/02 06:51:04 UTC

[dubbo] branch master updated: Heartbeat skip serialize and deserialize (#7077)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new f4b225e  Heartbeat skip serialize and deserialize (#7077)
f4b225e is described below

commit f4b225eb3a5acdf7c9064763f522ea0b86421c8d
Author: 张远征hd <ho...@163.com>
AuthorDate: Tue Feb 2 14:49:57 2021 +0800

    Heartbeat skip serialize and deserialize (#7077)
---
 .../remoting/exchange/codec/ExchangeCodec.java     | 86 ++++++++++++++--------
 .../dubbo/remoting/transport/CodecSupport.java     | 59 +++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       | 21 +++++-
 3 files changed, 132 insertions(+), 34 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 1ea6c10..42b0279 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -38,6 +38,7 @@ import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
 import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -148,19 +149,22 @@ public class ExchangeCodec extends TelnetCodec {
             byte status = header[3];
             res.setStatus(status);
             try {
-                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 if (status == Response.OK) {
                     Object data;
-                    if (res.isHeartbeat()) {
-                        data = decodeHeartbeatData(channel, in);
-                    } else if (res.isEvent()) {
-                        data = decodeEventData(channel, in);
+                    if (res.isEvent()) {
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                            // heart beat response data is always null;
+                            data = null;
+                        } else {
+                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                        }
                     } else {
-                        data = decodeResponseData(channel, in, getRequestData(id));
+                        data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
                     }
                     res.setResult(data);
                 } else {
-                    res.setErrorMessage(in.readUTF());
+                    res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
                 }
             } catch (Throwable t) {
                 res.setStatus(Response.CLIENT_ERROR);
@@ -176,14 +180,17 @@ public class ExchangeCodec extends TelnetCodec {
                 req.setEvent(true);
             }
             try {
-                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 Object data;
-                if (req.isHeartbeat()) {
-                    data = decodeHeartbeatData(channel, in);
-                } else if (req.isEvent()) {
-                    data = decodeEventData(channel, in);
+                if (req.isEvent()) {
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                        // heart beat response data is always null;
+                        data = null;
+                    } else {
+                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                    }
                 } else {
-                    data = decodeRequestData(channel, in);
+                    data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
                 }
                 req.setData(data);
             } catch (Throwable t) {
@@ -231,16 +238,23 @@ public class ExchangeCodec extends TelnetCodec {
         int savedWriteIndex = buffer.writerIndex();
         buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
         ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
-        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
-        if (req.isEvent()) {
-            encodeEventData(channel, out, req.getData());
+
+        if (req.isHeartbeat()) {
+            // heartbeat request data is always null
+            bos.write(CodecSupport.getNullBytesOf(serialization));
         } else {
-            encodeRequestData(channel, out, req.getData(), req.getVersion());
-        }
-        out.flushBuffer();
-        if (out instanceof Cleanable) {
-            ((Cleanable) out).cleanup();
+            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+            if (req.isEvent()) {
+                encodeEventData(channel, out, req.getData());
+            } else {
+                encodeRequestData(channel, out, req.getData(), req.getVersion());
+            }
+            out.flushBuffer();
+            if (out instanceof Cleanable) {
+                ((Cleanable) out).cleanup();
+            }
         }
+
         bos.flush();
         bos.close();
         int len = bos.writtenBytes();
@@ -274,21 +288,33 @@ public class ExchangeCodec extends TelnetCodec {
 
             buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
             ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
-            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+
             // encode response data or error message.
             if (status == Response.OK) {
-                if (res.isHeartbeat()) {
-                    encodeEventData(channel, out, res.getResult());
-                } else {
-                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
+                if(res.isHeartbeat()){
+                    // heartbeat response data is always null
+                    bos.write(CodecSupport.getNullBytesOf(serialization));
+                }else {
+                    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+                    if (res.isEvent()) {
+                        encodeEventData(channel, out, res.getResult());
+                    } else {
+                        encodeResponseData(channel, out, res.getResult(), res.getVersion());
+                    }
+                    out.flushBuffer();
+                    if (out instanceof Cleanable) {
+                        ((Cleanable) out).cleanup();
+                    }
                 }
             } else {
+                ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
                 out.writeUTF(res.getErrorMessage());
+                out.flushBuffer();
+                if (out instanceof Cleanable) {
+                    ((Cleanable) out).cleanup();
+                }
             }
-            out.flushBuffer();
-            if (out instanceof Cleanable) {
-                ((Cleanable) out).cleanup();
-            }
+
             bos.flush();
             bos.close();
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index 8c74fe5..dd6d270 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -22,11 +22,14 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
 import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.remoting.Constants;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -41,6 +44,8 @@ public class CodecSupport {
     private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
     private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
     private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();
+    // Cache null object serialize results, for heartbeat request/response serialize use.
+    private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
 
     static {
         Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
@@ -92,4 +97,58 @@ public class CodecSupport {
         Serialization s = getSerialization(url, proto);
         return s.deserialize(url, is);
     }
+
+    /**
+     * Get the null object serialize result byte[] of Serialization from the cache,
+     * if not, generate it first.
+     *
+     * @param s Serialization Instances
+     * @return serialize result of null object
+     */
+    public static byte[] getNullBytesOf(Serialization s) {
+        return ID_NULLBYTES_MAP.computeIfAbsent(s.getContentTypeId(), k -> {
+            //Pre-generated Null object bytes
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            byte[] nullBytes = new byte[0];
+            try {
+                ObjectOutput out = s.serialize(null, baos);
+                out.writeObject(null);
+                out.flushBuffer();
+                nullBytes = baos.toByteArray();
+                baos.close();
+            } catch (Exception e) {
+                logger.warn("Serialization extension " + s.getClass().getName() + " not support serializing null object, return an empty bytes instead.");
+            }
+            return nullBytes;
+        });
+    }
+
+    /**
+     * Read all payload to byte[]
+     *
+     * @param is
+     * @return
+     * @throws IOException
+     */
+    public static byte[] getPayload(InputStream is) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int len;
+        while ((len = is.read(buffer)) > -1) {
+            baos.write(buffer, 0, len);
+        }
+        baos.flush();
+        return baos.toByteArray();
+    }
+
+    /**
+     * Check if payload is null object serialize result byte[] of serialization
+     *
+     * @param payload
+     * @param proto
+     * @return
+     */
+    public static boolean isHeartBeat(byte[] payload, byte proto) {
+        return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)));
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 889f2ce..ef51a3f 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -33,6 +33,7 @@ import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcInvocation;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -79,8 +80,14 @@ public class DubboCodec extends ExchangeCodec {
                 if (status == Response.OK) {
                     Object data;
                     if (res.isEvent()) {
-                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                        data = decodeEventData(channel, in);
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                            // heart beat response data is always null;
+                            data = null;
+                        } else {
+                            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+                            data = decodeEventData(channel, in);
+                        }
                     } else {
                         DecodeableRpcResult result;
                         if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
@@ -118,8 +125,14 @@ public class DubboCodec extends ExchangeCodec {
             try {
                 Object data;
                 if (req.isEvent()) {
-                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                    data = decodeEventData(channel, in);
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                        // heart beat response data is always null;
+                        data = null;
+                    } else {
+                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+                        data = decodeEventData(channel, in);
+                    }
                 } else {
                     DecodeableRpcInvocation inv;
                     if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {