You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/03 04:46:52 UTC

[dubbo] 03/03: check serialization id on both side

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.7.8.1-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit e62014f96399cc0aadc9e3e3275b1d07e85122ba
Author: ken.lj <ke...@gmail.com>
AuthorDate: Wed Feb 3 12:45:25 2021 +0800

    check serialization id on both side
---
 .../dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml |  4 ++++
 .../dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml |  4 ++++
 .../remoting/exchange/codec/ExchangeCodec.java     | 21 ++++++++++-----------
 .../dubbo/remoting/transport/CodecSupport.java     | 22 +++++++++++++---------
 .../java/org/apache/dubbo/rpc/AppResponse.java     |  4 ++--
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  4 ++--
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |  7 +++++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       |  6 ++++--
 8 files changed, 46 insertions(+), 26 deletions(-)

diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
index b3a4acc..6a03ac7 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
@@ -83,5 +83,9 @@
             <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-serialization-hessian2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-jdk</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
index c4590c2..65ddaba 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
@@ -88,6 +88,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-jdk</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-qos</artifactId>
         </dependency>
         <dependency>
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 9367b80..5638128 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.remoting.exchange.codec;
 
 import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.io.Bytes;
 import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.logger.Logger;
@@ -157,7 +158,7 @@ public class ExchangeCodec extends TelnetCodec {
                             // heart beat response data is always null;
                             data = null;
                         } else {
-                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                         }
                     } else {
                         data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
@@ -187,7 +188,7 @@ public class ExchangeCodec extends TelnetCodec {
                         // heart beat response data is always null;
                         data = null;
                     } else {
-                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
+                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                     }
                 } else {
                     data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
@@ -416,11 +417,14 @@ public class ExchangeCodec extends TelnetCodec {
         return decodeRequestData(channel, in);
     }
 
-    protected Object decodeEventData(Channel channel, ObjectInput in, InputStream is) throws IOException {
+    protected Object decodeEventData(Channel channel, ObjectInput in, byte[] eventBytes) throws IOException {
         try {
-            int dataLen = is.available();
-            if (dataLen > 100) {
-                throw new IllegalArgumentException("Event data too long, rejected by deserialization security check.");
+            if (eventBytes != null) {
+                int dataLen = eventBytes.length;
+                int threshold = ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size", 10);
+                if (dataLen > threshold) {
+                    throw new IllegalArgumentException("Event data too long, actual size " + threshold + ", threshold " + threshold + " rejected for security consideration.");
+                }
             }
             return in.readEvent();
         } catch (IOException | ClassNotFoundException e) {
@@ -428,11 +432,6 @@ public class ExchangeCodec extends TelnetCodec {
         }
     }
 
-    @Deprecated
-    protected Object decodeHeartbeatData(Channel channel, ObjectInput in, InputStream is) throws IOException {
-        return decodeEventData(channel, in, is);
-    }
-
     protected Object decodeRequestData(Channel channel, ObjectInput in) throws IOException {
         return decodeRequestData(in);
     }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index 390b2e2..8592293 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -25,14 +25,10 @@ import org.apache.dubbo.common.serialize.ObjectInput;
 import org.apache.dubbo.common.serialize.ObjectOutput;
 import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ProviderModel;
 import org.apache.dubbo.rpc.model.ServiceRepository;
-import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceRepository;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -40,12 +36,10 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 public class CodecSupport {
-
     private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class);
     private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
     private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
@@ -53,6 +47,8 @@ public class CodecSupport {
     // Cache null object serialize results, for heartbeat request/response serialize use.
     private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
 
+    private static final ThreadLocal<byte[]> TL_BUFFER = ThreadLocal.withInitial(() -> new byte[1024]);
+
     static {
         Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
         for (String name : supportedExtensions) {
@@ -87,10 +83,10 @@ public class CodecSupport {
                 url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
     }
 
-    public static Serialization getSerialization(URL url, Byte id) {
+    public static Serialization getSerialization(URL url, Byte id) throws IOException {
         Serialization result = getSerializationById(id);
         if (result == null) {
-            result = getSerialization(url);
+            throw new IOException("Unrecognized serialize type from consumer: " + id);
         }
         return result;
     }
@@ -134,7 +130,7 @@ public class CodecSupport {
      */
     public static byte[] getPayload(InputStream is) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        byte[] buffer = new byte[1024];
+        byte[] buffer = getBuffer(is.available());
         int len;
         while ((len = is.read(buffer)) > -1) {
             baos.write(buffer, 0, len);
@@ -143,6 +139,14 @@ public class CodecSupport {
         return baos.toByteArray();
     }
 
+    private static byte[] getBuffer(int size) {
+        byte[] bytes = TL_BUFFER.get();
+        if (size <= bytes.length) {
+            return bytes;
+        }
+        return new byte[size];
+    }
+
     /**
      * Check if payload is null object serialize result byte[] of serialization
      *
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index c09e3b8..22538ac 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+import static org.apache.dubbo.rpc.Constants.INVOCATION_KEY;
 
 /**
  * {@link AsyncRpcResult} is introduced in 3.0.0 to replace RpcResult, and RpcResult is replaced with {@link AppResponse}:
@@ -63,7 +63,7 @@ public class AppResponse implements Result {
     }
 
     public AppResponse(Invocation invocation) {
-        this.attributes.put(SERIALIZATION_KEY, invocation);
+        this.setAttribute(INVOCATION_KEY, invocation);
     }
 
     public AppResponse(Object result) {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index d7e9a09..0db0f19 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -94,7 +94,7 @@ public class AsyncRpcResult implements Result {
             if (responseFuture.isDone()) {
                 responseFuture.get().setValue(value);
             } else {
-                AppResponse appResponse = new AppResponse();
+                AppResponse appResponse = new AppResponse(invocation);
                 appResponse.setValue(value);
                 responseFuture.complete(appResponse);
             }
@@ -116,7 +116,7 @@ public class AsyncRpcResult implements Result {
             if (responseFuture.isDone()) {
                 responseFuture.get().setException(t);
             } else {
-                AppResponse appResponse = new AppResponse();
+                AppResponse appResponse = new AppResponse(invocation);
                 appResponse.setException(t);
                 responseFuture.complete(appResponse);
             }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 0865f08..c06b720 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.InvokeMode;
@@ -44,6 +45,10 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.dubbo.remoting.Constants.DEFAULT_REMOTING_SERIALIZATION;
+import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+
 /**
  * This Invoker works on Consumer side.
  */
@@ -158,6 +163,8 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
 
+        invocation.put(SERIALIZATION_ID_KEY, CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION)));
+
         AsyncRpcResult asyncResult;
         try {
             asyncResult = (AsyncRpcResult) doInvoke(invocation);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index cb9acad..1686549 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -28,6 +28,8 @@ import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
+import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
@@ -86,7 +88,7 @@ public class DubboCodec extends ExchangeCodec {
                             data = null;
                         } else {
                             ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
-                            data = decodeEventData(channel, in);
+                            data = decodeEventData(channel, in, eventPayload);
                         }
                     } else {
                         DecodeableRpcResult result;
@@ -131,7 +133,7 @@ public class DubboCodec extends ExchangeCodec {
                         data = null;
                     } else {
                         ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
-                        data = decodeEventData(channel, in);
+                        data = decodeEventData(channel, in, eventPayload);
                     }
                 } else {
                     DecodeableRpcInvocation inv;