You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/03 04:46:51 UTC

[dubbo] 02/03: Heartbeat skip serialize and deserialize (#7077)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.7.8.1-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 1373fe0401d22d203f398b94f00bf63213973663
Author: 张远征hd <ho...@163.com>
AuthorDate: Tue Feb 2 14:49:57 2021 +0800

    Heartbeat skip serialize and deserialize (#7077)
---
 .../remoting/exchange/codec/ExchangeCodec.java     | 86 ++++++++++++++--------
 .../dubbo/remoting/transport/CodecSupport.java     | 64 ++++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       | 23 ++++--
 3 files changed, 137 insertions(+), 36 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 128ffe6..9367b80 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -38,6 +38,7 @@ import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
 import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -148,19 +149,22 @@ public class ExchangeCodec extends TelnetCodec {
             byte status = header[3];
             res.setStatus(status);
             try {
-                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 if (status == Response.OK) {
                     Object data;
-                    if (res.isHeartbeat()) {
-                        data = decodeHeartbeatData(channel, in, is);
-                    } else if (res.isEvent()) {
-                        data = decodeEventData(channel, in, is);
+                    if (res.isEvent()) {
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                            // heart beat response data is always null;
+                            data = null;
+                        } else {
+                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                        }
                     } else {
-                        data = decodeResponseData(channel, in, getRequestData(id));
+                        data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
                     }
                     res.setResult(data);
                 } else {
-                    res.setErrorMessage(in.readUTF());
+                    res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
                 }
             } catch (Throwable t) {
                 res.setStatus(Response.CLIENT_ERROR);
@@ -176,14 +180,17 @@ public class ExchangeCodec extends TelnetCodec {
                 req.setEvent(true);
             }
             try {
-                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 Object data;
-                if (req.isHeartbeat()) {
-                    data = decodeHeartbeatData(channel, in, is);
-                } else if (req.isEvent()) {
-                    data = decodeEventData(channel, in, is);
+                if (req.isEvent()) {
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                        // heart beat response data is always null;
+                        data = null;
+                    } else {
+                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                    }
                 } else {
-                    data = decodeRequestData(channel, in);
+                    data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
                 }
                 req.setData(data);
             } catch (Throwable t) {
@@ -231,16 +238,23 @@ public class ExchangeCodec extends TelnetCodec {
         int savedWriteIndex = buffer.writerIndex();
         buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
         ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
-        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
-        if (req.isEvent()) {
-            encodeEventData(channel, out, req.getData());
+
+        if (req.isHeartbeat()) {
+            // heartbeat request data is always null
+            bos.write(CodecSupport.getNullBytesOf(serialization));
         } else {
-            encodeRequestData(channel, out, req.getData(), req.getVersion());
-        }
-        out.flushBuffer();
-        if (out instanceof Cleanable) {
-            ((Cleanable) out).cleanup();
+            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+            if (req.isEvent()) {
+                encodeEventData(channel, out, req.getData());
+            } else {
+                encodeRequestData(channel, out, req.getData(), req.getVersion());
+            }
+            out.flushBuffer();
+            if (out instanceof Cleanable) {
+                ((Cleanable) out).cleanup();
+            }
         }
+
         bos.flush();
         bos.close();
         int len = bos.writtenBytes();
@@ -274,21 +288,33 @@ public class ExchangeCodec extends TelnetCodec {
 
             buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
             ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
-            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+
             // encode response data or error message.
             if (status == Response.OK) {
-                if (res.isHeartbeat()) {
-                    encodeEventData(channel, out, res.getResult());
-                } else {
-                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
+                if(res.isHeartbeat()){
+                    // heartbeat response data is always null
+                    bos.write(CodecSupport.getNullBytesOf(serialization));
+                }else {
+                    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+                    if (res.isEvent()) {
+                        encodeEventData(channel, out, res.getResult());
+                    } else {
+                        encodeResponseData(channel, out, res.getResult(), res.getVersion());
+                    }
+                    out.flushBuffer();
+                    if (out instanceof Cleanable) {
+                        ((Cleanable) out).cleanup();
+                    }
                 }
             } else {
+                ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
                 out.writeUTF(res.getErrorMessage());
+                out.flushBuffer();
+                if (out instanceof Cleanable) {
+                    ((Cleanable) out).cleanup();
+                }
             }
-            out.flushBuffer();
-            if (out instanceof Cleanable) {
-                ((Cleanable) out).cleanup();
-            }
+
             bos.flush();
             bos.close();
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index d8609b4..390b2e2 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -22,17 +22,25 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
 import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ProviderModel;
 import org.apache.dubbo.rpc.model.ServiceRepository;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ServiceRepository;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,6 +50,8 @@ public class CodecSupport {
     private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
     private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
     private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();
+    // Cache null object serialize results, for heartbeat request/response serialize use.
+    private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
 
     static {
         Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
@@ -90,6 +100,60 @@ public class CodecSupport {
         return s.deserialize(url, is);
     }
 
+    /**
+     * Get the null object serialize result byte[] of Serialization from the cache,
+     * if not, generate it first.
+     *
+     * @param s Serialization Instances
+     * @return serialize result of null object
+     */
+    public static byte[] getNullBytesOf(Serialization s) {
+        return ID_NULLBYTES_MAP.computeIfAbsent(s.getContentTypeId(), k -> {
+            //Pre-generated Null object bytes
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            byte[] nullBytes = new byte[0];
+            try {
+                ObjectOutput out = s.serialize(null, baos);
+                out.writeObject(null);
+                out.flushBuffer();
+                nullBytes = baos.toByteArray();
+                baos.close();
+            } catch (Exception e) {
+                logger.warn("Serialization extension " + s.getClass().getName() + " not support serializing null object, return an empty bytes instead.");
+            }
+            return nullBytes;
+        });
+    }
+
+    /**
+     * Read all payload to byte[]
+     *
+     * @param is
+     * @return
+     * @throws IOException
+     */
+    public static byte[] getPayload(InputStream is) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int len;
+        while ((len = is.read(buffer)) > -1) {
+            baos.write(buffer, 0, len);
+        }
+        baos.flush();
+        return baos.toByteArray();
+    }
+
+    /**
+     * Check if payload is null object serialize result byte[] of serialization
+     *
+     * @param payload
+     * @param proto
+     * @return
+     */
+    public static boolean isHeartBeat(byte[] payload, byte proto) {
+        return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)));
+    }
+
     public static void checkSerialization(String path, String version, Byte id) throws IOException {
         ServiceRepository repository = ApplicationModel.getServiceRepository();
         ProviderModel providerModel = repository.lookupExportedServiceWithoutGroup(path + ":" + version);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 3cfad81..cb9acad 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -28,13 +28,12 @@ import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
-import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
-import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcInvocation;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -81,8 +80,14 @@ public class DubboCodec extends ExchangeCodec {
                 if (status == Response.OK) {
                     Object data;
                     if (res.isEvent()) {
-                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                        data = decodeEventData(channel, in, is);
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                            // heart beat response data is always null;
+                            data = null;
+                        } else {
+                            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+                            data = decodeEventData(channel, in);
+                        }
                     } else {
                         DecodeableRpcResult result;
                         if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
@@ -120,8 +125,14 @@ public class DubboCodec extends ExchangeCodec {
             try {
                 Object data;
                 if (req.isEvent()) {
-                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                    data = decodeEventData(channel, in, is);
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                        // heart beat response data is always null;
+                        data = null;
+                    } else {
+                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+                        data = decodeEventData(channel, in);
+                    }
                 } else {
                     DecodeableRpcInvocation inv;
                     if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {