You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by il...@apache.org on 2018/10/23 04:11:39 UTC
[incubator-dubbo] branch 2.6.x updated: #1903: merge issue 1903's
fix from 2.7.0 to 2.6.x (#2668)
This is an automated email from the ASF dual-hosted git repository.
iluo pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new a531a5c #1903: merge issue 1903's fix from 2.7.0 to 2.6.x (#2668)
a531a5c is described below
commit a531a5cb9ace8f06a6192d65756d828ab8767f47
Author: Ian Luo <ia...@gmail.com>
AuthorDate: Tue Oct 23 12:11:34 2018 +0800
#1903: merge issue 1903's fix from 2.7.0 to 2.6.x (#2668)
---
.../remoting/exchange/codec/ExchangeCodec.java | 18 +++++-----
.../dubbo/remoting/transport/CodecSupport.java | 7 ++++
.../dubbo/remoting/codec/ExchangeCodecTest.java | 17 ++++++++++
.../transport/codec/DeprecatedExchangeCodec.java | 18 +++++-----
.../dubbo/rpc/protocol/dubbo/DubboCodec.java | 38 +++++++++-------------
5 files changed, 58 insertions(+), 40 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
index c1b8e09..f1e2bb8 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -139,8 +139,6 @@ public class ExchangeCodec extends TelnetCodec {
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
- Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
- ObjectInput in = s.deserialize(channel.getUrl(), is);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
@@ -152,8 +150,9 @@ public class ExchangeCodec extends TelnetCodec {
// get status.
byte status = header[3];
res.setStatus(status);
- if (status == Response.OK) {
- try {
+ try {
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
+ if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
@@ -163,12 +162,12 @@ public class ExchangeCodec extends TelnetCodec {
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
- } catch (Throwable t) {
- res.setStatus(Response.CLIENT_ERROR);
- res.setErrorMessage(StringUtils.toString(t));
+ } else {
+ res.setErrorMessage(in.readUTF());
}
- } else {
- res.setErrorMessage(in.readUTF());
+ } catch (Throwable t) {
+ res.setStatus(Response.CLIENT_ERROR);
+ res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
@@ -180,6 +179,7 @@ public class ExchangeCodec extends TelnetCodec {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
index a9b0c1a..3262e12 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
@@ -22,9 +22,11 @@ import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
+import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.serialize.Serialization;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -75,4 +77,9 @@ public class CodecSupport {
return serialization;
}
+ public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
+ Serialization s = getSerialization(url, proto);
+ return s.deserialize(url, is);
+ }
+
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
index e372e14..163c72c 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -148,6 +148,23 @@ public class ExchangeCodecTest extends TelnetCodecTest {
}
@Test
+ public void testInvalidSerializaitonId() throws Exception {
+ byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x8F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+ Object obj = decode(header);
+ Assert.assertTrue(obj instanceof Request);
+ Request request = (Request) obj;
+ Assert.assertTrue(request.isBroken());
+ Assert.assertTrue(request.getData() instanceof IOException);
+ header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x1F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+
+ obj = decode(header);
+ Assert.assertTrue(obj instanceof Response);
+ Response response = (Response) obj;
+ Assert.assertEquals(response.getStatus(), Response.CLIENT_ERROR);
+ Assert.assertTrue(response.getErrorMessage().contains("IOException"));
+ }
+
+ @Test
public void test_Decode_Check_Payload() throws IOException {
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
byte[] request = assemblyDataProtocol(header);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
index 2b44ada..bdbd280 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java
@@ -130,8 +130,6 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
- Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
- ObjectInput in = s.deserialize(channel.getUrl(), is);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
@@ -143,8 +141,9 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
// get status.
byte status = header[3];
res.setStatus(status);
- if (status == Response.OK) {
- try {
+ try {
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
+ if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
@@ -154,12 +153,12 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
- } catch (Throwable t) {
- res.setStatus(Response.CLIENT_ERROR);
- res.setErrorMessage(StringUtils.toString(t));
+ } else {
+ res.setErrorMessage(in.readUTF());
}
- } else {
- res.setErrorMessage(in.readUTF());
+ } catch (Throwable t) {
+ res.setStatus(Response.CLIENT_ERROR);
+ res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
@@ -171,6 +170,7 @@ final class DeprecatedExchangeCodec extends DeprecatedTelnetCodec implements Cod
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 80e4209..b159191 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -17,7 +17,6 @@
package com.alibaba.dubbo.rpc.protocol.dubbo;
import com.alibaba.dubbo.common.Constants;
-import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
@@ -25,7 +24,6 @@ import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.serialize.ObjectOutput;
-import com.alibaba.dubbo.common.serialize.Serialization;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.remoting.Channel;
@@ -63,7 +61,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
- Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
@@ -75,13 +72,14 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
// get status.
byte status = header[3];
res.setStatus(status);
- if (status == Response.OK) {
- try {
+ try {
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
+ if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
- data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
+ data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
- data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
+ data = decodeEventData(channel, in);
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
@@ -98,15 +96,15 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
data = result;
}
res.setResult(data);
- } catch (Throwable t) {
- if (log.isWarnEnabled()) {
- log.warn("Decode response failed: " + t.getMessage(), t);
- }
- res.setStatus(Response.CLIENT_ERROR);
- res.setErrorMessage(StringUtils.toString(t));
+ } else {
+ res.setErrorMessage(in.readUTF());
}
- } else {
- res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
+ } catch (Throwable t) {
+ if (log.isWarnEnabled()) {
+ log.warn("Decode response failed: " + t.getMessage(), t);
+ }
+ res.setStatus(Response.CLIENT_ERROR);
+ res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
@@ -119,10 +117,11 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
}
try {
Object data;
+ ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (req.isHeartbeat()) {
- data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
+ data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
- data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
+ data = decodeEventData(channel, in);
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
@@ -149,11 +148,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
}
}
- private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
- throws IOException {
- return serialization.deserialize(url, is);
- }
-
private byte[] readMessageData(InputStream is) throws IOException {
if (is.available() > 0) {
byte[] result = new byte[is.available()];