You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/10/19 03:20:06 UTC

[incubator-dubbo] branch master updated: Merge pull request #2656, make sure serialization exception sends back to consumer to preventing endless waiting.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new b723837  Merge pull request #2656, make sure serialization exception sends back to consumer to preventing endless waiting.
b723837 is described below

commit b7238372a2f65d03517693a0a99ce63e2c03cc42
Author: Ian Luo <ia...@gmail.com>
AuthorDate: Fri Oct 19 11:19:57 2018 +0800

    Merge pull request #2656, make sure serialization exception sends back to consumer to preventing endless waiting.
    
    Fixes #1903: Our customized serialization id exceeds the maximum limit, now it cannot work on 2.6.2 anymore.
---
 .../remoting/exchange/codec/ExchangeCodec.java     | 23 +++++++++++++---------
 .../dubbo/remoting/codec/ExchangeCodecTest.java    | 18 +++++++++++++++++
 .../transport/codec/DeprecatedExchangeCodec.java   | 23 +++++++++++++---------
 3 files changed, 46 insertions(+), 18 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 4b18cad..e6d8b07 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -139,8 +139,6 @@ public class ExchangeCodec extends TelnetCodec {
 
     protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
         byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
-        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
-        ObjectInput in = s.deserialize(channel.getUrl(), is);
         // get request id.
         long id = Bytes.bytes2long(header, 4);
         if ((flag & FLAG_REQUEST) == 0) {
@@ -152,8 +150,9 @@ public class ExchangeCodec extends TelnetCodec {
             // get status.
             byte status = header[3];
             res.setStatus(status);
-            if (status == Response.OK) {
-                try {
+            try {
+                ObjectInput in = deserialize(channel, is, proto);
+                if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
                         data = decodeHeartbeatData(channel, in);
@@ -163,12 +162,12 @@ public class ExchangeCodec extends TelnetCodec {
                         data = decodeResponseData(channel, in, getRequestData(id));
                     }
                     res.setResult(data);
-                } catch (Throwable t) {
-                    res.setStatus(Response.CLIENT_ERROR);
-                    res.setErrorMessage(StringUtils.toString(t));
+                } else {
+                    res.setErrorMessage(in.readUTF());
                 }
-            } else {
-                res.setErrorMessage(in.readUTF());
+            } catch (Throwable t) {
+                res.setStatus(Response.CLIENT_ERROR);
+                res.setErrorMessage(StringUtils.toString(t));
             }
             return res;
         } else {
@@ -180,6 +179,7 @@ public class ExchangeCodec extends TelnetCodec {
                 req.setEvent(Request.HEARTBEAT_EVENT);
             }
             try {
+                ObjectInput in = deserialize(channel, is, proto);
                 Object data;
                 if (req.isHeartbeat()) {
                     data = decodeHeartbeatData(channel, in);
@@ -198,6 +198,11 @@ public class ExchangeCodec extends TelnetCodec {
         }
     }
 
+    private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException {
+        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
+        return s.deserialize(channel.getUrl(), is);
+    }
+
     protected Object getRequestData(long id) {
         DefaultFuture future = DefaultFuture.getFuture(id);
         if (future == null)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
index cde663d..5c0d901 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
 import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
 
+import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -148,6 +149,23 @@ public class ExchangeCodecTest extends TelnetCodecTest {
     }
 
     @Test
+    public void testInvalidSerializaitonId() throws Exception {
+        byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x8F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+        Object obj =  decode(header);
+        Assert.assertTrue(obj instanceof Request);
+        Request request = (Request) obj;
+        Assert.assertTrue(request.isBroken());
+        Assert.assertTrue(request.getData() instanceof IOException);
+        header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x1F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+
+        obj = decode(header);
+        Assert.assertTrue(obj instanceof Response);
+        Response response = (Response) obj;
+        Assert.assertEquals(response.getStatus(), Response.CLIENT_ERROR);
+        Assert.assertTrue(response.getErrorMessage().contains("IOException"));
+    }
+
+    @Test
     public void test_Decode_Check_Payload() throws IOException {
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
         byte[] request = assemblyDataProtocol(header);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
index 5dc5962..41b6cc7 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
@@ -130,8 +130,6 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
 
     protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
         byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
-        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
-        ObjectInput in = s.deserialize(channel.getUrl(), is);
         // get request id.
         long id = Bytes.bytes2long(header, 4);
         if ((flag & FLAG_REQUEST) == 0) {
@@ -143,8 +141,9 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
             // get status.
             byte status = header[3];
             res.setStatus(status);
-            if (status == Response.OK) {
-                try {
+            try {
+                ObjectInput in = deserialize(channel, is, proto);
+                if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
                         data = decodeHeartbeatData(channel, in);
@@ -154,12 +153,12 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
                         data = decodeResponseData(channel, in, getRequestData(id));
                     }
                     res.setResult(data);
-                } catch (Throwable t) {
-                    res.setStatus(Response.CLIENT_ERROR);
-                    res.setErrorMessage(StringUtils.toString(t));
+                } else {
+                    res.setErrorMessage(in.readUTF());
                 }
-            } else {
-                res.setErrorMessage(in.readUTF());
+            } catch (Throwable t) {
+                res.setStatus(Response.CLIENT_ERROR);
+                res.setErrorMessage(StringUtils.toString(t));
             }
             return res;
         } else {
@@ -171,6 +170,7 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
                 req.setEvent(Request.HEARTBEAT_EVENT);
             }
             try {
+                ObjectInput in = deserialize(channel, is, proto);
                 Object data;
                 if (req.isHeartbeat()) {
                     data = decodeHeartbeatData(channel, in);
@@ -189,6 +189,11 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
         }
     }
 
+    private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException {
+        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
+        return s.deserialize(channel.getUrl(), is);
+    }
+
     protected Object getRequestData(long id) {
         DefaultFuture future = DefaultFuture.getFuture(id);
         if (future == null)