You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/03 04:46:49 UTC

[dubbo] branch 2.7.8.1-release updated (27571a0 -> e62014f)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a change to branch 2.7.8.1-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git.


    from 27571a0  [Unit Test] Use matrix build workflow & Add Integration Test (#7078)
     new 711343e  fix deserialization vulnerability
     new 1373fe0  Heartbeat skip serialize and deserialize (#7077)
     new e62014f  check serialization id on both side

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/dubbo/common/BaseServiceMetadata.java   |   4 +-
 .../dubbo/common/constants/CommonConstants.java    |   2 +
 .../dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml |   4 +
 .../dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml |   4 +
 .../remoting/exchange/codec/ExchangeCodec.java     | 110 ++++++++++++--------
 .../dubbo/remoting/transport/AbstractCodec.java    |  20 +++-
 .../dubbo/remoting/transport/CodecSupport.java     | 112 ++++++++++++++++++---
 .../dubbo/remoting/codec/ExchangeCodecTest.java    |  16 +++
 .../java/org/apache/dubbo/rpc/AppResponse.java     |  16 +++
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |   6 +-
 .../main/java/org/apache/dubbo/rpc/Constants.java  |   4 +
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |   7 ++
 .../dubbo/rpc/proxy/AbstractProxyInvoker.java      |   4 +-
 .../protocol/dubbo/DecodeableRpcInvocation.java    |  14 ++-
 .../rpc/protocol/dubbo/DecodeableRpcResult.java    |  12 +++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       |  34 ++++++-
 .../rpc/protocol/dubbo/DubboCodecSupport.java      |  53 ++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     |   3 +-
 18 files changed, 352 insertions(+), 73 deletions(-)
 create mode 100644 dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java


[dubbo] 03/03: check serialization id on both side

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.7.8.1-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit e62014f96399cc0aadc9e3e3275b1d07e85122ba
Author: ken.lj <ke...@gmail.com>
AuthorDate: Wed Feb 3 12:45:25 2021 +0800

    check serialization id on both side
---
 .../dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml |  4 ++++
 .../dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml |  4 ++++
 .../remoting/exchange/codec/ExchangeCodec.java     | 21 ++++++++++-----------
 .../dubbo/remoting/transport/CodecSupport.java     | 22 +++++++++++++---------
 .../java/org/apache/dubbo/rpc/AppResponse.java     |  4 ++--
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  4 ++--
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |  7 +++++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       |  6 ++++--
 8 files changed, 46 insertions(+), 26 deletions(-)

diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
index b3a4acc..6a03ac7 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
@@ -83,5 +83,9 @@
             <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-serialization-hessian2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-jdk</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
index c4590c2..65ddaba 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
@@ -88,6 +88,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-jdk</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-qos</artifactId>
         </dependency>
         <dependency>
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 9367b80..5638128 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.remoting.exchange.codec;
 
 import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.io.Bytes;
 import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.logger.Logger;
@@ -157,7 +158,7 @@ public class ExchangeCodec extends TelnetCodec {
                             // heart beat response data is always null;
                             data = null;
                         } else {
-                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                         }
                     } else {
                         data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
@@ -187,7 +188,7 @@ public class ExchangeCodec extends TelnetCodec {
                         // heart beat response data is always null;
                         data = null;
                     } else {
-                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                     }
                 } else {
                     data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
@@ -416,11 +417,14 @@ public class ExchangeCodec extends TelnetCodec {
         return decodeRequestData(channel, in);
     }
 
-    protected Object decodeEventData(Channel channel, ObjectInput in, InputStream is) throws IOException {
+    protected Object decodeEventData(Channel channel, ObjectInput in, byte[] eventBytes) throws IOException {
         try {
-            int dataLen = is.available();
-            if (dataLen > 100) {
-                throw new IllegalArgumentException("Event data too long, rejected by deserialization security check.");
+            if (eventBytes != null) {
+                int dataLen = eventBytes.length;
+                int threshold = ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size", 10);
+                if (dataLen > threshold) {
+                    throw new IllegalArgumentException("Event data too long, actual size " + threshold + ", threshold " + threshold + " rejected for security consideration.");
+                }
             }
             return in.readEvent();
         } catch (IOException | ClassNotFoundException e) {
@@ -428,11 +432,6 @@ public class ExchangeCodec extends TelnetCodec {
         }
     }
 
-    @Deprecated
-    protected Object decodeHeartbeatData(Channel channel, ObjectInput in, InputStream is) throws IOException {
-        return decodeEventData(channel, in, is);
-    }
-
     protected Object decodeRequestData(Channel channel, ObjectInput in) throws IOException {
         return decodeRequestData(in);
     }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index 390b2e2..8592293 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -25,14 +25,10 @@ import org.apache.dubbo.common.serialize.ObjectInput;
 import org.apache.dubbo.common.serialize.ObjectOutput;
 import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ProviderModel;
 import org.apache.dubbo.rpc.model.ServiceRepository;
-import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceRepository;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -40,12 +36,10 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 public class CodecSupport {
-
     private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class);
     private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
     private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
@@ -53,6 +47,8 @@ public class CodecSupport {
     // Cache null object serialize results, for heartbeat request/response serialize use.
     private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
 
+    private static final ThreadLocal<byte[]> TL_BUFFER = ThreadLocal.withInitial(() -> new byte[1024]);
+
     static {
         Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
         for (String name : supportedExtensions) {
@@ -87,10 +83,10 @@ public class CodecSupport {
                 url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
     }
 
-    public static Serialization getSerialization(URL url, Byte id) {
+    public static Serialization getSerialization(URL url, Byte id) throws IOException {
         Serialization result = getSerializationById(id);
         if (result == null) {
-            result = getSerialization(url);
+            throw new IOException("Unrecognized serialize type from consumer: " + id);
         }
         return result;
     }
@@ -134,7 +130,7 @@ public class CodecSupport {
      */
     public static byte[] getPayload(InputStream is) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        byte[] buffer = new byte[1024];
+        byte[] buffer = getBuffer(is.available());
         int len;
         while ((len = is.read(buffer)) > -1) {
             baos.write(buffer, 0, len);
@@ -143,6 +139,14 @@ public class CodecSupport {
         return baos.toByteArray();
     }
 
+    private static byte[] getBuffer(int size) {
+        byte[] bytes = TL_BUFFER.get();
+        if (size <= bytes.length) {
+            return bytes;
+        }
+        return new byte[size];
+    }
+
     /**
      * Check if payload is null object serialize result byte[] of serialization
      *
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index c09e3b8..22538ac 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+import static org.apache.dubbo.rpc.Constants.INVOCATION_KEY;
 
 /**
  * {@link AsyncRpcResult} is introduced in 3.0.0 to replace RpcResult, and RpcResult is replaced with {@link AppResponse}:
@@ -63,7 +63,7 @@ public class AppResponse implements Result {
     }
 
     public AppResponse(Invocation invocation) {
-        this.attributes.put(SERIALIZATION_KEY, invocation);
+        this.setAttribute(INVOCATION_KEY, invocation);
     }
 
     public AppResponse(Object result) {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index d7e9a09..0db0f19 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -94,7 +94,7 @@ public class AsyncRpcResult implements Result {
             if (responseFuture.isDone()) {
                 responseFuture.get().setValue(value);
             } else {
-                AppResponse appResponse = new AppResponse();
+                AppResponse appResponse = new AppResponse(invocation);
                 appResponse.setValue(value);
                 responseFuture.complete(appResponse);
             }
@@ -116,7 +116,7 @@ public class AsyncRpcResult implements Result {
             if (responseFuture.isDone()) {
                 responseFuture.get().setException(t);
             } else {
-                AppResponse appResponse = new AppResponse();
+                AppResponse appResponse = new AppResponse(invocation);
                 appResponse.setException(t);
                 responseFuture.complete(appResponse);
             }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 0865f08..c06b720 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.InvokeMode;
@@ -44,6 +45,10 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.dubbo.remoting.Constants.DEFAULT_REMOTING_SERIALIZATION;
+import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+
 /**
  * This Invoker works on Consumer side.
  */
@@ -158,6 +163,8 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
 
+        invocation.put(SERIALIZATION_ID_KEY, CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION)));
+
         AsyncRpcResult asyncResult;
         try {
             asyncResult = (AsyncRpcResult) doInvoke(invocation);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index cb9acad..1686549 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -28,6 +28,8 @@ import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
+import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
@@ -86,7 +88,7 @@ public class DubboCodec extends ExchangeCodec {
                             data = null;
                         } else {
                             ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
-                            data = decodeEventData(channel, in);
+                            data = decodeEventData(channel, in, eventPayload);
                         }
                     } else {
                         DecodeableRpcResult result;
@@ -131,7 +133,7 @@ public class DubboCodec extends ExchangeCodec {
                         data = null;
                     } else {
                         ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
-                        data = decodeEventData(channel, in);
+                        data = decodeEventData(channel, in, eventPayload);
                     }
                 } else {
                     DecodeableRpcInvocation inv;


[dubbo] 02/03: Heartbeat skip serialize and deserialize (#7077)

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.7.8.1-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 1373fe0401d22d203f398b94f00bf63213973663
Author: 张远征hd <ho...@163.com>
AuthorDate: Tue Feb 2 14:49:57 2021 +0800

    Heartbeat skip serialize and deserialize (#7077)
---
 .../remoting/exchange/codec/ExchangeCodec.java     | 86 ++++++++++++++--------
 .../dubbo/remoting/transport/CodecSupport.java     | 64 ++++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       | 23 ++++--
 3 files changed, 137 insertions(+), 36 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 128ffe6..9367b80 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -38,6 +38,7 @@ import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
 import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -148,19 +149,22 @@ public class ExchangeCodec extends TelnetCodec {
             byte status = header[3];
             res.setStatus(status);
             try {
-                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 if (status == Response.OK) {
                     Object data;
-                    if (res.isHeartbeat()) {
-                        data = decodeHeartbeatData(channel, in, is);
-                    } else if (res.isEvent()) {
-                        data = decodeEventData(channel, in, is);
+                    if (res.isEvent()) {
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                            // heart beat response data is always null;
+                            data = null;
+                        } else {
+                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                        }
                     } else {
-                        data = decodeResponseData(channel, in, getRequestData(id));
+                        data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
                     }
                     res.setResult(data);
                 } else {
-                    res.setErrorMessage(in.readUTF());
+                    res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
                 }
             } catch (Throwable t) {
                 res.setStatus(Response.CLIENT_ERROR);
@@ -176,14 +180,17 @@ public class ExchangeCodec extends TelnetCodec {
                 req.setEvent(true);
             }
             try {
-                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 Object data;
-                if (req.isHeartbeat()) {
-                    data = decodeHeartbeatData(channel, in, is);
-                } else if (req.isEvent()) {
-                    data = decodeEventData(channel, in, is);
+                if (req.isEvent()) {
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                        // heart beat response data is always null;
+                        data = null;
+                    } else {
+                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                    }
                 } else {
-                    data = decodeRequestData(channel, in);
+                    data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
                 }
                 req.setData(data);
             } catch (Throwable t) {
@@ -231,16 +238,23 @@ public class ExchangeCodec extends TelnetCodec {
         int savedWriteIndex = buffer.writerIndex();
         buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
         ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
-        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
-        if (req.isEvent()) {
-            encodeEventData(channel, out, req.getData());
+
+        if (req.isHeartbeat()) {
+            // heartbeat request data is always null
+            bos.write(CodecSupport.getNullBytesOf(serialization));
         } else {
-            encodeRequestData(channel, out, req.getData(), req.getVersion());
-        }
-        out.flushBuffer();
-        if (out instanceof Cleanable) {
-            ((Cleanable) out).cleanup();
+            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+            if (req.isEvent()) {
+                encodeEventData(channel, out, req.getData());
+            } else {
+                encodeRequestData(channel, out, req.getData(), req.getVersion());
+            }
+            out.flushBuffer();
+            if (out instanceof Cleanable) {
+                ((Cleanable) out).cleanup();
+            }
         }
+
         bos.flush();
         bos.close();
         int len = bos.writtenBytes();
@@ -274,21 +288,33 @@ public class ExchangeCodec extends TelnetCodec {
 
             buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
             ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
-            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+
             // encode response data or error message.
             if (status == Response.OK) {
-                if (res.isHeartbeat()) {
-                    encodeEventData(channel, out, res.getResult());
-                } else {
-                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
+                if(res.isHeartbeat()){
+                    // heartbeat response data is always null
+                    bos.write(CodecSupport.getNullBytesOf(serialization));
+                }else {
+                    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
+                    if (res.isEvent()) {
+                        encodeEventData(channel, out, res.getResult());
+                    } else {
+                        encodeResponseData(channel, out, res.getResult(), res.getVersion());
+                    }
+                    out.flushBuffer();
+                    if (out instanceof Cleanable) {
+                        ((Cleanable) out).cleanup();
+                    }
                 }
             } else {
+                ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
                 out.writeUTF(res.getErrorMessage());
+                out.flushBuffer();
+                if (out instanceof Cleanable) {
+                    ((Cleanable) out).cleanup();
+                }
             }
-            out.flushBuffer();
-            if (out instanceof Cleanable) {
-                ((Cleanable) out).cleanup();
-            }
+
             bos.flush();
             bos.close();
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index d8609b4..390b2e2 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -22,17 +22,25 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
 import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ProviderModel;
 import org.apache.dubbo.rpc.model.ServiceRepository;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ServiceRepository;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,6 +50,8 @@ public class CodecSupport {
     private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
     private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
     private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();
+    // Cache null object serialize results, for heartbeat request/response serialize use.
+    private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
 
     static {
         Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
@@ -90,6 +100,60 @@ public class CodecSupport {
         return s.deserialize(url, is);
     }
 
+    /**
+     * Get the null object serialize result byte[] of Serialization from the cache,
+     * if not, generate it first.
+     *
+     * @param s Serialization Instances
+     * @return serialize result of null object
+     */
+    public static byte[] getNullBytesOf(Serialization s) {
+        return ID_NULLBYTES_MAP.computeIfAbsent(s.getContentTypeId(), k -> {
+            //Pre-generated Null object bytes
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            byte[] nullBytes = new byte[0];
+            try {
+                ObjectOutput out = s.serialize(null, baos);
+                out.writeObject(null);
+                out.flushBuffer();
+                nullBytes = baos.toByteArray();
+                baos.close();
+            } catch (Exception e) {
+                logger.warn("Serialization extension " + s.getClass().getName() + " not support serializing null object, return an empty bytes instead.");
+            }
+            return nullBytes;
+        });
+    }
+
+    /**
+     * Read all payload to byte[]
+     *
+     * @param is
+     * @return
+     * @throws IOException
+     */
+    public static byte[] getPayload(InputStream is) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int len;
+        while ((len = is.read(buffer)) > -1) {
+            baos.write(buffer, 0, len);
+        }
+        baos.flush();
+        return baos.toByteArray();
+    }
+
+    /**
+     * Check if payload is null object serialize result byte[] of serialization
+     *
+     * @param payload
+     * @param proto
+     * @return
+     */
+    public static boolean isHeartBeat(byte[] payload, byte proto) {
+        return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)));
+    }
+
     public static void checkSerialization(String path, String version, Byte id) throws IOException {
         ServiceRepository repository = ApplicationModel.getServiceRepository();
         ProviderModel providerModel = repository.lookupExportedServiceWithoutGroup(path + ":" + version);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 3cfad81..cb9acad 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -28,13 +28,12 @@ import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
-import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
-import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcInvocation;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -81,8 +80,14 @@ public class DubboCodec extends ExchangeCodec {
                 if (status == Response.OK) {
                     Object data;
                     if (res.isEvent()) {
-                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                        data = decodeEventData(channel, in, is);
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                            // heart beat response data is always null;
+                            data = null;
+                        } else {
+                            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+                            data = decodeEventData(channel, in);
+                        }
                     } else {
                         DecodeableRpcResult result;
                         if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
@@ -120,8 +125,14 @@ public class DubboCodec extends ExchangeCodec {
             try {
                 Object data;
                 if (req.isEvent()) {
-                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                    data = decodeEventData(channel, in, is);
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
+                        // heart beat response data is always null;
+                        data = null;
+                    } else {
+                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
+                        data = decodeEventData(channel, in);
+                    }
                 } else {
                     DecodeableRpcInvocation inv;
                     if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {


[dubbo] 01/03: fix deserialization vulnerability

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.7.8.1-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 711343eda70a7597182d024111423a8aba9a4b14
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Feb 2 14:56:18 2021 +0800

    fix deserialization vulnerability
---
 .../apache/dubbo/common/BaseServiceMetadata.java   |  4 +-
 .../dubbo/common/constants/CommonConstants.java    |  2 +
 .../remoting/exchange/codec/ExchangeCodec.java     | 27 ++++++-----
 .../dubbo/remoting/transport/AbstractCodec.java    | 20 ++++++--
 .../dubbo/remoting/transport/CodecSupport.java     | 44 ++++++++++++------
 .../dubbo/remoting/codec/ExchangeCodecTest.java    | 16 +++++++
 .../java/org/apache/dubbo/rpc/AppResponse.java     | 16 +++++++
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  2 +-
 .../main/java/org/apache/dubbo/rpc/Constants.java  |  4 ++
 .../dubbo/rpc/proxy/AbstractProxyInvoker.java      |  4 +-
 .../protocol/dubbo/DecodeableRpcInvocation.java    | 14 +++++-
 .../rpc/protocol/dubbo/DecodeableRpcResult.java    | 12 +++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       | 17 ++++++-
 .../rpc/protocol/dubbo/DubboCodecSupport.java      | 53 ++++++++++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     |  3 +-
 15 files changed, 198 insertions(+), 40 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java b/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
index 9d65ffd..6bf4491 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
@@ -18,6 +18,8 @@ package org.apache.dubbo.common;
 
 import org.apache.dubbo.common.utils.StringUtils;
 
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
+
 /**
  * 2019-10-10
  */
@@ -44,7 +46,7 @@ public class BaseServiceMetadata {
     public static String versionFromServiceKey(String serviceKey) {
         int index = serviceKey.indexOf(":");
         if (index == -1) {
-            return null;
+            return DEFAULT_VERSION;
         }
         return serviceKey.substring(index + 1);
     }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 8ff7338..4d81bf9 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -364,4 +364,6 @@ public interface CommonConstants {
     String SENTINEL_REDIS = "sentinel";
 
     String CLUSTER_REDIS = "cluster";
+
+    String DEFAULT_VERSION = "0.0.0";
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 1ea6c10..128ffe6 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -152,9 +152,9 @@ public class ExchangeCodec extends TelnetCodec {
                 if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
-                        data = decodeHeartbeatData(channel, in);
+                        data = decodeHeartbeatData(channel, in, is);
                     } else if (res.isEvent()) {
-                        data = decodeEventData(channel, in);
+                        data = decodeEventData(channel, in, is);
                     } else {
                         data = decodeResponseData(channel, in, getRequestData(id));
                     }
@@ -179,9 +179,9 @@ public class ExchangeCodec extends TelnetCodec {
                 ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 Object data;
                 if (req.isHeartbeat()) {
-                    data = decodeHeartbeatData(channel, in);
+                    data = decodeHeartbeatData(channel, in, is);
                 } else if (req.isEvent()) {
-                    data = decodeEventData(channel, in);
+                    data = decodeEventData(channel, in, is);
                 } else {
                     data = decodeRequestData(channel, in);
                 }
@@ -208,7 +208,7 @@ public class ExchangeCodec extends TelnetCodec {
     }
 
     protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
-        Serialization serialization = getSerialization(channel);
+        Serialization serialization = getSerialization(channel, req);
         // header.
         byte[] header = new byte[HEADER_LENGTH];
         // set magic number.
@@ -256,7 +256,7 @@ public class ExchangeCodec extends TelnetCodec {
     protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
         int savedWriteIndex = buffer.writerIndex();
         try {
-            Serialization serialization = getSerialization(channel);
+            Serialization serialization = getSerialization(channel, res);
             // header.
             byte[] header = new byte[HEADER_LENGTH];
             // set magic number.
@@ -347,11 +347,6 @@ public class ExchangeCodec extends TelnetCodec {
         return decodeRequestData(in);
     }
 
-    @Deprecated
-    protected Object decodeHeartbeatData(ObjectInput in) throws IOException {
-        return decodeEventData(null, in);
-    }
-
     protected Object decodeRequestData(ObjectInput in) throws IOException {
         try {
             return in.readObject();
@@ -395,8 +390,12 @@ public class ExchangeCodec extends TelnetCodec {
         return decodeRequestData(channel, in);
     }
 
-    protected Object decodeEventData(Channel channel, ObjectInput in) throws IOException {
+    protected Object decodeEventData(Channel channel, ObjectInput in, InputStream is) throws IOException {
         try {
+            int dataLen = is.available();
+            if (dataLen > 100) {
+                throw new IllegalArgumentException("Event data too long, rejected by deserialization security check.");
+            }
             return in.readEvent();
         } catch (IOException | ClassNotFoundException e) {
             throw new IOException(StringUtils.toString("Decode dubbo protocol event failed.", e));
@@ -404,8 +403,8 @@ public class ExchangeCodec extends TelnetCodec {
     }
 
     @Deprecated
-    protected Object decodeHeartbeatData(Channel channel, ObjectInput in) throws IOException {
-        return decodeEventData(channel, in);
+    protected Object decodeHeartbeatData(Channel channel, ObjectInput in, InputStream is) throws IOException {
+        return decodeEventData(channel, in, is);
     }
 
     protected Object decodeRequestData(Channel channel, ObjectInput in) throws IOException {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
index 9be9663..e84c28e 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
@@ -16,9 +16,6 @@
  */
 package org.apache.dubbo.remoting.transport;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
@@ -27,6 +24,11 @@ import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.Codec2;
 import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
 
@@ -48,18 +50,26 @@ public abstract class AbstractCodec implements Codec2 {
         }
         if (payload > 0 && size > payload) {
             ExceedPayloadLimitException e = new ExceedPayloadLimitException(
-                "Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
+                    "Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
             logger.error(e);
             throw e;
         }
     }
 
+    protected Serialization getSerialization(Channel channel, Request req) {
+        return CodecSupport.getSerialization(channel.getUrl());
+    }
+
+    protected Serialization getSerialization(Channel channel, Response res) {
+        return CodecSupport.getSerialization(channel.getUrl());
+    }
+
     protected Serialization getSerialization(Channel channel) {
         return CodecSupport.getSerialization(channel.getUrl());
     }
 
     protected boolean isClientSide(Channel channel) {
-        String side = (String)channel.getAttribute(SIDE_KEY);
+        String side = (String) channel.getAttribute(SIDE_KEY);
         if (CLIENT_SIDE.equals(side)) {
             return true;
         } else if (SERVER_SIDE.equals(side)) {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index 8c74fe5..d8609b4 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -23,18 +23,19 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.ObjectInput;
 import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ServiceRepository;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.dubbo.common.serialize.Constants.COMPACTED_JAVA_SERIALIZATION_ID;
-import static org.apache.dubbo.common.serialize.Constants.JAVA_SERIALIZATION_ID;
-import static org.apache.dubbo.common.serialize.Constants.NATIVE_JAVA_SERIALIZATION_ID;
-
 public class CodecSupport {
 
     private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class);
@@ -76,20 +77,37 @@ public class CodecSupport {
                 url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
     }
 
-    public static Serialization getSerialization(URL url, Byte id) throws IOException {
-        Serialization serialization = getSerializationById(id);
-        String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
-        // Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose.
-        if (serialization == null
-                || ((id == JAVA_SERIALIZATION_ID || id == NATIVE_JAVA_SERIALIZATION_ID || id == COMPACTED_JAVA_SERIALIZATION_ID)
-                && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
-            throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
+    public static Serialization getSerialization(URL url, Byte id) {
+        Serialization result = getSerializationById(id);
+        if (result == null) {
+            result = getSerialization(url);
         }
-        return serialization;
+        return result;
     }
 
     public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
         Serialization s = getSerialization(url, proto);
         return s.deserialize(url, is);
     }
+
+    public static void checkSerialization(String path, String version, Byte id) throws IOException {
+        ServiceRepository repository = ApplicationModel.getServiceRepository();
+        ProviderModel providerModel = repository.lookupExportedServiceWithoutGroup(path + ":" + version);
+        if (providerModel == null) {
+            if (logger.isWarnEnabled()) {
+                logger.warn("Serialization security check is enabled but cannot work as expected because " +
+                        "there's no matched provider model for path " + path + ", version " + version);
+            }
+        } else {
+            List<URL> urls = providerModel.getServiceConfig().getExportedUrls();
+            if (CollectionUtils.isNotEmpty(urls)) {
+                URL url = urls.get(0);
+                String serializationName = url.getParameter(org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
+                Byte localId = SERIALIZATIONNAME_ID_MAP.get(serializationName);
+                if (localId != null && !localId.equals(id)) {
+                    throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
+                }
+            }
+        }
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
index cb99fe3..3133735 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -96,6 +96,22 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         return request;
     }
 
+    private byte[] getReadonlyEventRequestBytes(Object obj, byte[] header) throws IOException {
+        // encode request data.
+        UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream(1024);
+        ObjectOutput out = serialization.serialize(url, bos);
+        out.writeObject(obj);
+
+        out.flushBuffer();
+        bos.flush();
+        bos.close();
+        byte[] data = bos.toByteArray();
+//        byte[] len = Bytes.int2bytes(data.length);
+        System.arraycopy(data, 0, header, 12, data.length);
+        byte[] request = join(header, data);
+        return request;
+    }
+
     private byte[] assemblyDataProtocol(byte[] header) {
         Person request = new Person();
         byte[] newbuf = join(header, objectToByte(request));
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index 7ebff3f..c09e3b8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+
 /**
  * {@link AsyncRpcResult} is introduced in 3.0.0 to replace RpcResult, and RpcResult is replaced with {@link AppResponse}:
  * <ul>
@@ -55,9 +57,15 @@ public class AppResponse implements Result {
 
     private Map<String, Object> attachments = new HashMap<>();
 
+    private Map<String, Object> attributes = new HashMap<>();
+
     public AppResponse() {
     }
 
+    public AppResponse(Invocation invocation) {
+        this.attributes.put(SERIALIZATION_KEY, invocation);
+    }
+
     public AppResponse(Object result) {
         this.result = result;
     }
@@ -212,6 +220,14 @@ public class AppResponse implements Result {
         attachments.put(key, value);
     }
 
+    public Object getAttribute(String key) {
+        return attributes.get(key);
+    }
+
+    public void setAttribute(String key, Object value) {
+        attributes.put(key, value);
+    }
+
     @Override
     public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
         throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index d61d48f..d7e9a09 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -319,7 +319,7 @@ public class AsyncRpcResult implements Result {
 
     public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
         CompletableFuture<AppResponse> future = new CompletableFuture<>();
-        AppResponse result = new AppResponse();
+        AppResponse result = new AppResponse(invocation);
         if (t != null) {
             result.setException(t);
         } else {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index f5fd982..a544fa4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -90,4 +90,8 @@ public interface Constants {
 
     String CONSUMER_MODEL = "consumerModel";
     String METHOD_MODEL = "methodModel";
+
+    String SERIALIZATION_SECURITY_CHECK_KEY = "serialization.security.check";
+    String INVOCATION_KEY = "invocation";
+    String SERIALIZATION_ID_KEY = "serialization_id";
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 1b1d592..79168c9 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -82,9 +82,9 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
     public Result invoke(Invocation invocation) throws RpcException {
         try {
             Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
-			CompletableFuture<Object> future = wrapWithFuture(value);
+            CompletableFuture<Object> future = wrapWithFuture(value);
             CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
-                AppResponse result = new AppResponse();
+                AppResponse result = new AppResponse(invocation);
                 if (t != null) {
                     if (t instanceof CompletionException) {
                         result.setException(t.getCause());
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index f286ddb..ebfc41c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.protocol.dubbo;
 
 
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.Cleanable;
@@ -47,6 +48,8 @@ import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KE
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY;
 import static org.apache.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.decodeInvocationArgument;
 
 public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
@@ -95,10 +98,15 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
         throw new UnsupportedOperationException();
     }
 
+    private void checkSerializationTypeFromRemote() {
+
+    }
+
     @Override
     public Object decode(Channel channel, InputStream input) throws IOException {
         ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                 .deserialize(channel.getUrl(), input);
+        this.put(SERIALIZATION_ID_KEY, serializationType);
 
         String dubboVersion = in.readUTF();
         request.setVersion(dubboVersion);
@@ -106,7 +114,8 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
 
         String path = in.readUTF();
         setAttachment(PATH_KEY, path);
-        setAttachment(VERSION_KEY, in.readUTF());
+        String version = in.readUTF();
+        setAttachment(VERSION_KEY, version);
 
         setMethodName(in.readUTF());
 
@@ -114,6 +123,9 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
         setParameterTypesDesc(desc);
 
         try {
+            if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
+                CodecSupport.checkSerialization(path, version, serializationType);
+            }
             Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
             Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
             if (desc.length() > 0) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index 83db986..3ac302d 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol.dubbo;
 
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.Cleanable;
@@ -38,6 +39,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Type;
 
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY;
+
 public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable {
 
     private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);
@@ -114,6 +118,14 @@ public class DecodeableRpcResult extends AppResponse implements Codec, Decodeabl
     public void decode() throws Exception {
         if (!hasDecoded && channel != null && inputStream != null) {
             try {
+                if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
+                    Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
+                    if (serializationType_obj != null) {
+                        if ((byte) serializationType_obj != serializationType) {
+                            throw new IOException("Unexpected serialization id:" + serializationType + " received from network, please check if the peer send the right id.");
+                        }
+                    }
+                }
                 decode(channel, inputStream);
             } catch (Throwable e) {
                 if (log.isWarnEnabled()) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 889f2ce..3cfad81 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -23,12 +23,14 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.ObjectInput;
 import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
 import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcInvocation;
@@ -80,7 +82,7 @@ public class DubboCodec extends ExchangeCodec {
                     Object data;
                     if (res.isEvent()) {
                         ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                        data = decodeEventData(channel, in);
+                        data = decodeEventData(channel, in, is);
                     } else {
                         DecodeableRpcResult result;
                         if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
@@ -119,7 +121,7 @@ public class DubboCodec extends ExchangeCodec {
                 Object data;
                 if (req.isEvent()) {
                     ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
-                    data = decodeEventData(channel, in);
+                    data = decodeEventData(channel, in, is);
                 } else {
                     DecodeableRpcInvocation inv;
                     if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
@@ -213,4 +215,15 @@ public class DubboCodec extends ExchangeCodec {
             out.writeAttachments(result.getObjectAttachments());
         }
     }
+
+    @Override
+    protected Serialization getSerialization(Channel channel, Request req) {
+        return DubboCodecSupport.getRequestSerialization(channel.getUrl(), (Invocation) req.getData());
+    }
+
+    @Override
+    protected Serialization getSerialization(Channel channel, Response res) {
+        return DubboCodecSupport.getResponseSerialization(channel.getUrl(), (AppResponse) res.getResult());
+    }
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java
new file mode 100644
index 0000000..2dbb312
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.Invocation;
+
+import static org.apache.dubbo.rpc.Constants.INVOCATION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+
+public class DubboCodecSupport {
+
+    public static Serialization getRequestSerialization(URL url, Invocation invocation) {
+        Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
+        if (serializationType_obj != null) {
+            return CodecSupport.getSerializationById((byte) serializationType_obj);
+        }
+        return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
+                url.getParameter(org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
+    }
+
+    public static Serialization getResponseSerialization(URL url, AppResponse appResponse) {
+        Object invocation_obj = appResponse.getAttribute(INVOCATION_KEY);
+        if (invocation_obj != null) {
+            Invocation invocation = (Invocation) invocation_obj;
+            Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
+            if (serializationType_obj != null) {
+                return CodecSupport.getSerializationById((byte) serializationType_obj);
+            }
+        }
+        return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
+                url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 4fc52f7..c4c9474 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
 import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
@@ -76,7 +77,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
         super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY, TOKEN_KEY});
         this.clients = clients;
         // get version.
-        this.version = url.getParameter(VERSION_KEY, "0.0.0");
+        this.version = url.getParameter(VERSION_KEY, DEFAULT_VERSION);
         this.invokers = invokers;
     }