You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/03/20 15:56:28 UTC
[incubator-dubbo] branch master updated: Add unit test for unpack
and stick pack of dubbo and telent (#3703)
This is an automated email from the ASF dual-hosted git repository.
huxing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 6e4ff91 Add unit test for unpack and stick pack of dubbo and telent (#3703)
6e4ff91 is described below
commit 6e4ff91dfca4395a8d1b180f40f632e97acf779d
Author: zhaixiaoxiang <xx...@126.com>
AuthorDate: Wed Mar 20 23:56:05 2019 +0800
Add unit test for unpack and stick pack of dubbo and telent (#3703)
---
.../transport/netty4/NettyCodecAdapter.java | 2 +-
.../dubbo/decode/DubboTelnetDecodeTest.java | 477 +++++++++++++++++++++
.../dubbo/decode/LocalEmbeddedChannel.java | 35 ++
.../rpc/protocol/dubbo/decode/MockChannel.java | 115 +++++
.../protocol/dubbo/decode/MockChannelHandler.java | 61 +++
.../rpc/protocol/dubbo/decode/MockHandler.java | 40 ++
6 files changed, 729 insertions(+), 1 deletion(-)
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
index 5f3b784..e0e4513 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
@@ -33,7 +33,7 @@ import java.util.List;
/**
* NettyCodecAdapter.
*/
-final class NettyCodecAdapter {
+final public class NettyCodecAdapter {
private final ChannelHandler encoder = new InternalEncoder();
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
new file mode 100644
index 0000000..e54d2e6
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
+import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler;
+import org.apache.dubbo.remoting.transport.DecodeHandler;
+import org.apache.dubbo.remoting.transport.MultiMessageHandler;
+import org.apache.dubbo.remoting.transport.netty4.NettyBackedChannelBuffer;
+import org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation;
+import org.apache.dubbo.rpc.protocol.dubbo.DubboCodec;
+import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * These junit tests aim to test unpack and stick pack of dubbo and telnet
+ */
+public class DubboTelnetDecodeTest {
+ private static AtomicInteger dubbo = new AtomicInteger(0);
+
+ private static AtomicInteger telnet = new AtomicInteger(0);
+
+ private static AtomicInteger telnetDubbo = new AtomicInteger(0);
+
+ private static AtomicInteger dubboDubbo = new AtomicInteger(0);
+
+ private static AtomicInteger dubboTelnet = new AtomicInteger(0);
+
+ private static AtomicInteger telnetTelnet = new AtomicInteger(0);
+
+ /**
+ * just dubbo request
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testDubboDecode() throws InterruptedException, IOException {
+ ByteBuf dubboByteBuf = createDubboByteBuf();
+
+ EmbeddedChannel ch = null;
+ try {
+ Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+ URL url = new URL("dubbo", "localhost", 22226);
+ NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+ MockHandler mockHandler = new MockHandler(null,
+ new MultiMessageHandler(
+ new DecodeHandler(
+ new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+ @Override
+ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+ if (checkDubboDecoded(msg)) {
+ dubbo.incrementAndGet();
+ }
+ return null;
+ }
+ }))));
+
+ ch = new LocalEmbeddedChannel();
+ ch.pipeline()
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("handler", mockHandler);
+
+ ch.writeInbound(dubboByteBuf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (ch != null) {
+ ch.close().await(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ Assertions.assertEquals(1, dubbo.get());
+ }
+
+ /**
+ * just telnet request
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTelnetDecode() throws InterruptedException {
+ ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("ls\r\n".getBytes());
+
+ EmbeddedChannel ch = null;
+ try {
+ Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+ URL url = new URL("dubbo", "localhost", 22226);
+ NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+ MockHandler mockHandler = new MockHandler((msg) -> {
+ if (checkTelnetDecoded(msg)) {
+ telnet.incrementAndGet();
+ }
+ },
+ new MultiMessageHandler(
+ new DecodeHandler(
+ new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+ @Override
+ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+ return null;
+ }
+ }))));
+
+ ch = new LocalEmbeddedChannel();
+ ch.pipeline()
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("handler", mockHandler);
+
+ ch.writeInbound(telnetByteBuf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (ch != null) {
+ ch.close().await(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ Assertions.assertEquals(1, telnet.get());
+ }
+
+ /**
+ * telnet and dubbo request
+ *
+ * <p>
+ * First ByteBuf:
+ * +--------------------------------------------------+
+ * | telnet(incomplete) |
+ * +--------------------------------------------------+
+ * <p>
+ *
+ * Second ByteBuf:
+ * +--------------------------++----------------------+
+ * | telnet(the remaining) || dubbo(complete) |
+ * +--------------------------++----------------------+
+ * ||
+ * Magic Code
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTelnetDubboDecoded() throws InterruptedException, IOException {
+ ByteBuf dubboByteBuf = createDubboByteBuf();
+
+ ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("ls\r".getBytes());
+ EmbeddedChannel ch = null;
+ try {
+ Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+ URL url = new URL("dubbo", "localhost", 22226);
+ NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+ MockHandler mockHandler = new MockHandler((msg) -> {
+ if (checkTelnetDecoded(msg)) {
+ telnetDubbo.incrementAndGet();
+ }
+ },
+ new MultiMessageHandler(
+ new DecodeHandler(
+ new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+ @Override
+ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+ if (checkDubboDecoded(msg)) {
+ telnetDubbo.incrementAndGet();
+ }
+ return null;
+ }
+ }))));
+
+ ch = new LocalEmbeddedChannel();
+ ch.pipeline()
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("handler", mockHandler);
+
+ ch.writeInbound(telnetByteBuf);
+ ch.writeInbound(Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("\n".getBytes()), dubboByteBuf));
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (ch != null) {
+ ch.close().await(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ Assertions.assertEquals(2, telnetDubbo.get());
+ }
+
+ /**
+ * NOTE: This test case actually will fail, but the probability of this case is very small,
+ * and users should use telnet in new QOS port(default port is 22222) since dubbo 2.5.8,
+ * so we could ignore this problem.
+ *
+ * <p>
+ * telnet and telnet request
+ *
+ * <p>
+ * First ByteBuf (firstByteBuf):
+ * +--------------------------------------------------+
+ * | telnet(incomplete) |
+ * +--------------------------------------------------+
+ * <p>
+ *
+ * Second ByteBuf (secondByteBuf):
+ * +--------------------------------------------------+
+ * | telnet(the remaining) | telnet(complete) |
+ * +--------------------------------------------------+
+ *
+ * @throws InterruptedException
+ */
+ // @Test
+ public void testTelnetTelnetDecoded() throws InterruptedException {
+ ByteBuf firstByteBuf = Unpooled.wrappedBuffer("ls\r".getBytes());
+ ByteBuf secondByteBuf = Unpooled.wrappedBuffer("\nls\r\n".getBytes());
+
+ EmbeddedChannel ch = null;
+ try {
+ Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+ URL url = new URL("dubbo", "localhost", 22226);
+ NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+ MockHandler mockHandler = new MockHandler((msg) -> {
+ if (checkTelnetDecoded(msg)) {
+ telnetTelnet.incrementAndGet();
+ }
+ },
+ new MultiMessageHandler(
+ new DecodeHandler(
+ new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+ @Override
+ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+ return null;
+ }
+ }))));
+
+ ch = new LocalEmbeddedChannel();
+ ch.pipeline()
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("handler", mockHandler);
+
+ ch.writeInbound(firstByteBuf);
+ ch.writeInbound(secondByteBuf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (ch != null) {
+ ch.close().await(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ Assertions.assertEquals(2, telnetTelnet.get());
+ }
+
+ /**
+ * dubbo and dubbo request
+ *
+ * <p>
+ * First ByteBuf (firstDubboByteBuf):
+ * ++-------------------------------------------------+
+ * || dubbo(incomplete) |
+ * ++-------------------------------------------------+
+ * ||
+ * Magic Code
+ * <p>
+ *
+ * <p>
+ * Second ByteBuf (secondDubboByteBuf):
+ * +-------------------------++-----------------------+
+ * | dubbo(the remaining) || dubbo(complete) |
+ * +-------------------------++-----------------------+
+ * ||
+ * Magic Code
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testDubboDubboDecoded() throws InterruptedException, IOException {
+ ByteBuf dubboByteBuf = createDubboByteBuf();
+
+ ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50);
+ ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes() - 50);
+ ByteBuf secondDubboByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, dubboByteBuf);
+
+
+ EmbeddedChannel ch = null;
+ try {
+ Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+ URL url = new URL("dubbo", "localhost", 22226);
+ NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+ MockHandler mockHandler = new MockHandler(null,
+ new MultiMessageHandler(
+ new DecodeHandler(
+ new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+ @Override
+ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+ if (checkDubboDecoded(msg)) {
+ dubboDubbo.incrementAndGet();
+ }
+ return null;
+ }
+ }))));
+
+ ch = new LocalEmbeddedChannel();
+ ch.pipeline()
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("handler", mockHandler);
+
+ ch.writeInbound(firstDubboByteBuf);
+ ch.writeInbound(secondDubboByteBuf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (ch != null) {
+ ch.close().await(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ Assertions.assertEquals(2, dubboDubbo.get());
+ }
+
+ /**
+ * dubbo and telnet request
+ *
+ * <p>
+ * First ByteBuf:
+ * ++-------------------------------------------------+
+ * || dubbo(incomplete) |
+ * ++-------------------------------------------------+
+ * ||
+ * Magic Code
+ *
+ * <p>
+ * Second ByteBuf:
+ * +--------------------------------------------------+
+ * | dubbo(the remaining) | telnet(complete) |
+ * +--------------------------------------------------+
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testDubboTelnetDecoded() throws InterruptedException, IOException {
+ ByteBuf dubboByteBuf = createDubboByteBuf();
+ ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50);
+ ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes());
+
+ ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("\r\n".getBytes());
+ ByteBuf secondByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, telnetByteBuf);
+
+ EmbeddedChannel ch = null;
+ try {
+ Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+ URL url = new URL("dubbo", "localhost", 22226);
+ NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+ MockHandler mockHandler = new MockHandler((msg) -> {
+ if (checkTelnetDecoded(msg)) {
+ dubboTelnet.incrementAndGet();
+ }
+ },
+ new MultiMessageHandler(
+ new DecodeHandler(
+ new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+ @Override
+ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+ if (checkDubboDecoded(msg)) {
+ dubboTelnet.incrementAndGet();
+ }
+ return null;
+ }
+ }))));
+
+ ch = new LocalEmbeddedChannel();
+ ch.pipeline()
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("handler", mockHandler);
+
+ ch.writeInbound(firstDubboByteBuf);
+ ch.writeInbound(secondByteBuf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (ch != null) {
+ ch.close().await(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ Assertions.assertEquals(2, dubboTelnet.get());
+ }
+
+ private ByteBuf createDubboByteBuf() throws IOException {
+ Request request = new Request();
+ RpcInvocation rpcInvocation = new RpcInvocation();
+ rpcInvocation.setMethodName("sayHello");
+ rpcInvocation.setParameterTypes(new Class[]{String.class});
+ rpcInvocation.setArguments(new String[]{"dubbo"});
+ rpcInvocation.setAttachment("path", DemoService.class.getName());
+ rpcInvocation.setAttachment("interface", DemoService.class.getName());
+ rpcInvocation.setAttachment("version", "0.0.0");
+
+ request.setData(rpcInvocation);
+ request.setVersion("2.0.2");
+
+ ByteBuf dubboByteBuf = Unpooled.buffer();
+ ChannelBuffer buffer = new NettyBackedChannelBuffer(dubboByteBuf);
+ DubboCodec dubboCodec = new DubboCodec();
+ dubboCodec.encode(new MockChannel(), buffer, request);
+
+ return dubboByteBuf;
+ }
+
+ private static boolean checkTelnetDecoded(Object msg) {
+ if (msg != null && msg instanceof String && !msg.toString().contains("Unsupported command:")) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean checkDubboDecoded(Object msg) {
+ if (msg instanceof DecodeableRpcInvocation) {
+ DecodeableRpcInvocation invocation = (DecodeableRpcInvocation) msg;
+ if ("sayHello".equals(invocation.getMethodName())
+ && invocation.getParameterTypes().length == 1
+ && String.class.equals(invocation.getParameterTypes()[0])
+ && invocation.getArguments().length == 1
+ && "dubbo".equals(invocation.getArguments()[0])
+ && DemoService.class.getName().equals(invocation.getAttachment("path"))
+ && DemoService.class.getName().equals(invocation.getAttachment("interface"))
+ && "0.0.0".equals(invocation.getAttachment("version"))) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java
new file mode 100644
index 0000000..adc590d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.utils.NetUtils;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class LocalEmbeddedChannel extends EmbeddedChannel {
+ public SocketAddress localAddress() {
+ return new InetSocketAddress(20883);
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return new InetSocketAddress(NetUtils.getAvailablePort());
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java
new file mode 100644
index 0000000..80a2dc5
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import java.net.InetSocketAddress;
+import java.util.function.Consumer;
+
+public class MockChannel implements Channel {
+ private Consumer consumer;
+
+ public MockChannel() {
+
+ }
+
+ public MockChannel(Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return new InetSocketAddress(NetUtils.getAvailablePort());
+ }
+
+ @Override
+ public boolean isConnected() {
+ return false;
+ }
+
+ @Override
+ public boolean hasAttribute(String key) {
+ return false;
+ }
+
+ @Override
+ public Object getAttribute(String key) {
+ return null;
+ }
+
+ @Override
+ public void setAttribute(String key, Object value) {
+
+ }
+
+ @Override
+ public void removeAttribute(String key) {
+
+ }
+
+ @Override
+ public URL getUrl() {
+ return new URL("dubbo", "localhost", 20880);
+ }
+
+ @Override
+ public ChannelHandler getChannelHandler() {
+ return null;
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ return new InetSocketAddress(20883);
+ }
+
+ @Override
+ public void send(Object message) throws RemotingException {
+ if (consumer != null) {
+ consumer.accept(message);
+ }
+ }
+
+ @Override
+ public void send(Object message, boolean sent) throws RemotingException {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void close(int timeout) {
+
+ }
+
+ @Override
+ public void startClose() {
+
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java
new file mode 100644
index 0000000..4929297
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class MockChannelHandler implements ChannelHandler {
+ // ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<String, Channel>();
+ ConcurrentHashSet<Channel> channels = new ConcurrentHashSet<Channel>();
+
+ @Override
+ public void connected(Channel channel) throws RemotingException {
+ channels.add(channel);
+ }
+
+ @Override
+ public void disconnected(Channel channel) throws RemotingException {
+ channels.remove(channel);
+ }
+
+ @Override
+ public void sent(Channel channel, Object message) throws RemotingException {
+ channel.send(message);
+ }
+
+ @Override
+ public void received(Channel channel, Object message) throws RemotingException {
+ //echo
+ channel.send(message);
+ }
+
+ @Override
+ public void caught(Channel channel, Throwable exception) throws RemotingException {
+ throw new RemotingException(channel, exception);
+
+ }
+
+ public Set<Channel> getChannels() {
+ return Collections.unmodifiableSet(channels);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java
new file mode 100644
index 0000000..f4f432a
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.remoting.ChannelHandler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.util.function.Consumer;
+
+public class MockHandler extends ChannelDuplexHandler {
+ private final Consumer consumer;
+
+ private final ChannelHandler handler;
+
+ public MockHandler(Consumer consumer, ChannelHandler handler) {
+ this.consumer = consumer;
+ this.handler = handler;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ this.handler.received(new MockChannel(consumer), msg);
+ }
+}