You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by il...@apache.org on 2018/10/23 04:11:39 UTC

[incubator-dubbo] branch 2.6.x updated: #1903: merge issue 1903's fix from 2.7.0 to 2.6.x (#2668)

This is an automated email from the ASF dual-hosted git repository.

iluo pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new a531a5c      #1903: merge issue 1903's fix from 2.7.0 to 2.6.x (#2668)
a531a5c is described below

commit a531a5cb9ace8f06a6192d65756d828ab8767f47
Author: Ian Luo <ia...@gmail.com>
AuthorDate: Tue Oct 23 12:11:34 2018 +0800

        #1903: merge issue 1903's fix from 2.7.0 to 2.6.x (#2668)
---
 .../remoting/exchange/codec/ExchangeCodec.java     | 18 +++++-----
 .../dubbo/remoting/transport/CodecSupport.java     |  7 ++++
 .../dubbo/remoting/codec/ExchangeCodecTest.java    | 17 ++++++++++
 .../transport/codec/DeprecatedExchangeCodec.java   | 18 +++++-----
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       | 38 +++++++++-------------
 5 files changed, 58 insertions(+), 40 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
index c1b8e09..f1e2bb8 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -139,8 +139,6 @@ public class ExchangeCodec extends TelnetCodec {
 
     protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
         byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
-        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
-        ObjectInput in = s.deserialize(channel.getUrl(), is);
         // get request id.
         long id = Bytes.bytes2long(header, 4);
         if ((flag & FLAG_REQUEST) == 0) {
@@ -152,8 +150,9 @@ public class ExchangeCodec extends TelnetCodec {
             // get status.
             byte status = header[3];
             res.setStatus(status);
-            if (status == Response.OK) {
-                try {
+            try {
+                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
+                if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
                         data = decodeHeartbeatData(channel, in);
@@ -163,12 +162,12 @@ public class ExchangeCodec extends TelnetCodec {
                         data = decodeResponseData(channel, in, getRequestData(id));
                     }
                     res.setResult(data);
-                } catch (Throwable t) {
-                    res.setStatus(Response.CLIENT_ERROR);
-                    res.setErrorMessage(StringUtils.toString(t));
+                } else {
+                    res.setErrorMessage(in.readUTF());
                 }
-            } else {
-                res.setErrorMessage(in.readUTF());
+            } catch (Throwable t) {
+                res.setStatus(Response.CLIENT_ERROR);
+                res.setErrorMessage(StringUtils.toString(t));
             }
             return res;
         } else {
@@ -180,6 +179,7 @@ public class ExchangeCodec extends TelnetCodec {
                 req.setEvent(Request.HEARTBEAT_EVENT);
             }
             try {
+                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 Object data;
                 if (req.isHeartbeat()) {
                     data = decodeHeartbeatData(channel, in);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
index a9b0c1a..3262e12 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
@@ -22,9 +22,11 @@ import com.alibaba.dubbo.common.URL;
 import com.alibaba.dubbo.common.extension.ExtensionLoader;
 import com.alibaba.dubbo.common.logger.Logger;
 import com.alibaba.dubbo.common.logger.LoggerFactory;
+import com.alibaba.dubbo.common.serialize.ObjectInput;
 import com.alibaba.dubbo.common.serialize.Serialization;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -75,4 +77,9 @@ public class CodecSupport {
         return serialization;
     }
 
+    public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
+        Serialization s = getSerialization(url, proto);
+        return s.deserialize(url, is);
+    }
+
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
index e372e14..163c72c 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -148,6 +148,23 @@ public class ExchangeCodecTest extends TelnetCodecTest {
     }
 
     @Test
+    public void testInvalidSerializaitonId() throws Exception {
+        byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x8F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+        Object obj =  decode(header);
+        Assert.assertTrue(obj instanceof Request);
+        Request request = (Request) obj;
+        Assert.assertTrue(request.isBroken());
+        Assert.assertTrue(request.getData() instanceof IOException);
+        header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x1F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+
+        obj = decode(header);
+        Assert.assertTrue(obj instanceof Response);
+        Response response = (Response) obj;
+        Assert.assertEquals(response.getStatus(), Response.CLIENT_ERROR);
+        Assert.assertTrue(response.getErrorMessage().contains("IOException"));
+    }
+
+    @Test
     public void test_Decode_Check_Payload() throws IOException {
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
         byte[] request = assemblyDataProtocol(header);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
index 2b44ada..bdbd280 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
@@ -130,8 +130,6 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
 
     protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
         byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
-        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
-        ObjectInput in = s.deserialize(channel.getUrl(), is);
         // get request id.
         long id = Bytes.bytes2long(header, 4);
         if ((flag & FLAG_REQUEST) == 0) {
@@ -143,8 +141,9 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
             // get status.
             byte status = header[3];
             res.setStatus(status);
-            if (status == Response.OK) {
-                try {
+            try {
+                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
+                if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
                         data = decodeHeartbeatData(channel, in);
@@ -154,12 +153,12 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
                         data = decodeResponseData(channel, in, getRequestData(id));
                     }
                     res.setResult(data);
-                } catch (Throwable t) {
-                    res.setStatus(Response.CLIENT_ERROR);
-                    res.setErrorMessage(StringUtils.toString(t));
+                } else {
+                    res.setErrorMessage(in.readUTF());
                 }
-            } else {
-                res.setErrorMessage(in.readUTF());
+            } catch (Throwable t) {
+                res.setStatus(Response.CLIENT_ERROR);
+                res.setErrorMessage(StringUtils.toString(t));
             }
             return res;
         } else {
@@ -171,6 +170,7 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
                 req.setEvent(Request.HEARTBEAT_EVENT);
             }
             try {
+                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 Object data;
                 if (req.isHeartbeat()) {
                     data = decodeHeartbeatData(channel, in);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 80e4209..b159191 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -17,7 +17,6 @@
 package com.alibaba.dubbo.rpc.protocol.dubbo;
 
 import com.alibaba.dubbo.common.Constants;
-import com.alibaba.dubbo.common.URL;
 import com.alibaba.dubbo.common.Version;
 import com.alibaba.dubbo.common.io.Bytes;
 import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
@@ -25,7 +24,6 @@ import com.alibaba.dubbo.common.logger.Logger;
 import com.alibaba.dubbo.common.logger.LoggerFactory;
 import com.alibaba.dubbo.common.serialize.ObjectInput;
 import com.alibaba.dubbo.common.serialize.ObjectOutput;
-import com.alibaba.dubbo.common.serialize.Serialization;
 import com.alibaba.dubbo.common.utils.ReflectUtils;
 import com.alibaba.dubbo.common.utils.StringUtils;
 import com.alibaba.dubbo.remoting.Channel;
@@ -63,7 +61,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
     @Override
     protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
         byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
-        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
         // get request id.
         long id = Bytes.bytes2long(header, 4);
         if ((flag & FLAG_REQUEST) == 0) {
@@ -75,13 +72,14 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
             // get status.
             byte status = header[3];
             res.setStatus(status);
-            if (status == Response.OK) {
-                try {
+            try {
+                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
+                if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
-                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
+                        data = decodeHeartbeatData(channel, in);
                     } else if (res.isEvent()) {
-                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
+                        data = decodeEventData(channel, in);
                     } else {
                         DecodeableRpcResult result;
                         if (channel.getUrl().getParameter(
@@ -98,15 +96,15 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
                         data = result;
                     }
                     res.setResult(data);
-                } catch (Throwable t) {
-                    if (log.isWarnEnabled()) {
-                        log.warn("Decode response failed: " + t.getMessage(), t);
-                    }
-                    res.setStatus(Response.CLIENT_ERROR);
-                    res.setErrorMessage(StringUtils.toString(t));
+                } else {
+                    res.setErrorMessage(in.readUTF());
                 }
-            } else {
-                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
+            } catch (Throwable t) {
+                if (log.isWarnEnabled()) {
+                    log.warn("Decode response failed: " + t.getMessage(), t);
+                }
+                res.setStatus(Response.CLIENT_ERROR);
+                res.setErrorMessage(StringUtils.toString(t));
             }
             return res;
         } else {
@@ -119,10 +117,11 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
             }
             try {
                 Object data;
+                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                 if (req.isHeartbeat()) {
-                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
+                    data = decodeHeartbeatData(channel, in);
                 } else if (req.isEvent()) {
-                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
+                    data = decodeEventData(channel, in);
                 } else {
                     DecodeableRpcInvocation inv;
                     if (channel.getUrl().getParameter(
@@ -149,11 +148,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
         }
     }
 
-    private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
-            throws IOException {
-        return serialization.deserialize(url, is);
-    }
-
     private byte[] readMessageData(InputStream is) throws IOException {
         if (is.available() > 0) {
             byte[] result = new byte[is.available()];