You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/03/20 15:56:28 UTC

[incubator-dubbo] branch master updated: Add unit test for unpack and stick pack of dubbo and telent (#3703)

This is an automated email from the ASF dual-hosted git repository.

huxing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e4ff91  Add unit test for unpack and stick pack of dubbo and telent (#3703)
6e4ff91 is described below

commit 6e4ff91dfca4395a8d1b180f40f632e97acf779d
Author: zhaixiaoxiang <xx...@126.com>
AuthorDate: Wed Mar 20 23:56:05 2019 +0800

    Add unit test for unpack and stick pack of dubbo and telent (#3703)
---
 .../transport/netty4/NettyCodecAdapter.java        |   2 +-
 .../dubbo/decode/DubboTelnetDecodeTest.java        | 477 +++++++++++++++++++++
 .../dubbo/decode/LocalEmbeddedChannel.java         |  35 ++
 .../rpc/protocol/dubbo/decode/MockChannel.java     | 115 +++++
 .../protocol/dubbo/decode/MockChannelHandler.java  |  61 +++
 .../rpc/protocol/dubbo/decode/MockHandler.java     |  40 ++
 6 files changed, 729 insertions(+), 1 deletion(-)

diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
index 5f3b784..e0e4513 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
@@ -33,7 +33,7 @@ import java.util.List;
 /**
  * NettyCodecAdapter.
  */
-final class NettyCodecAdapter {
+final public class NettyCodecAdapter {
 
     private final ChannelHandler encoder = new InternalEncoder();
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
new file mode 100644
index 0000000..e54d2e6
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
+import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler;
+import org.apache.dubbo.remoting.transport.DecodeHandler;
+import org.apache.dubbo.remoting.transport.MultiMessageHandler;
+import org.apache.dubbo.remoting.transport.netty4.NettyBackedChannelBuffer;
+import org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation;
+import org.apache.dubbo.rpc.protocol.dubbo.DubboCodec;
+import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * These junit tests aim to test unpack and stick pack of dubbo and telnet
+ */
+public class DubboTelnetDecodeTest {
+    private static AtomicInteger dubbo = new AtomicInteger(0);
+
+    private static AtomicInteger telnet = new AtomicInteger(0);
+
+    private static AtomicInteger telnetDubbo = new AtomicInteger(0);
+
+    private static AtomicInteger dubboDubbo = new AtomicInteger(0);
+
+    private static AtomicInteger dubboTelnet = new AtomicInteger(0);
+
+    private static AtomicInteger telnetTelnet = new AtomicInteger(0);
+
+    /**
+     * just dubbo request
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testDubboDecode() throws InterruptedException, IOException {
+        ByteBuf dubboByteBuf = createDubboByteBuf();
+
+        EmbeddedChannel ch = null;
+        try {
+            Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+            URL url = new URL("dubbo", "localhost", 22226);
+            NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+            MockHandler mockHandler = new MockHandler(null,
+                    new MultiMessageHandler(
+                            new DecodeHandler(
+                                    new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+                                        @Override
+                                        public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+                                            if (checkDubboDecoded(msg)) {
+                                                dubbo.incrementAndGet();
+                                            }
+                                            return null;
+                                        }
+                                    }))));
+
+            ch = new LocalEmbeddedChannel();
+            ch.pipeline()
+                    .addLast("decoder", adapter.getDecoder())
+                    .addLast("handler", mockHandler);
+
+            ch.writeInbound(dubboByteBuf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (ch != null) {
+                ch.close().await(200, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        TimeUnit.MILLISECONDS.sleep(100);
+
+        Assertions.assertEquals(1, dubbo.get());
+    }
+
+    /**
+     * just telnet request
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testTelnetDecode() throws InterruptedException {
+        ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("ls\r\n".getBytes());
+
+        EmbeddedChannel ch = null;
+        try {
+            Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+            URL url = new URL("dubbo", "localhost", 22226);
+            NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+            MockHandler mockHandler = new MockHandler((msg) -> {
+                if (checkTelnetDecoded(msg)) {
+                    telnet.incrementAndGet();
+                }
+            },
+                    new MultiMessageHandler(
+                            new DecodeHandler(
+                                    new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+                                        @Override
+                                        public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+                                            return null;
+                                        }
+                                    }))));
+
+            ch = new LocalEmbeddedChannel();
+            ch.pipeline()
+                    .addLast("decoder", adapter.getDecoder())
+                    .addLast("handler", mockHandler);
+
+            ch.writeInbound(telnetByteBuf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (ch != null) {
+                ch.close().await(200, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        TimeUnit.MILLISECONDS.sleep(100);
+
+        Assertions.assertEquals(1, telnet.get());
+    }
+
+    /**
+     * telnet and dubbo request
+     *
+     * <p>
+     * First ByteBuf:
+     * +--------------------------------------------------+
+     * |               telnet(incomplete)                 |
+     * +--------------------------------------------------+
+     * <p>
+     *
+     * Second ByteBuf:
+     * +--------------------------++----------------------+
+     * |  telnet(the remaining)   ||   dubbo(complete)    |
+     * +--------------------------++----------------------+
+     *                            ||
+     *                        Magic Code
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testTelnetDubboDecoded() throws InterruptedException, IOException {
+        ByteBuf dubboByteBuf = createDubboByteBuf();
+
+        ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("ls\r".getBytes());
+        EmbeddedChannel ch = null;
+        try {
+            Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+            URL url = new URL("dubbo", "localhost", 22226);
+            NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+            MockHandler mockHandler = new MockHandler((msg) -> {
+                if (checkTelnetDecoded(msg)) {
+                    telnetDubbo.incrementAndGet();
+                }
+            },
+                    new MultiMessageHandler(
+                            new DecodeHandler(
+                                    new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+                                        @Override
+                                        public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+                                            if (checkDubboDecoded(msg)) {
+                                                telnetDubbo.incrementAndGet();
+                                            }
+                                            return null;
+                                        }
+                                    }))));
+
+            ch = new LocalEmbeddedChannel();
+            ch.pipeline()
+                    .addLast("decoder", adapter.getDecoder())
+                    .addLast("handler", mockHandler);
+
+            ch.writeInbound(telnetByteBuf);
+            ch.writeInbound(Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("\n".getBytes()), dubboByteBuf));
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (ch != null) {
+                ch.close().await(200, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        TimeUnit.MILLISECONDS.sleep(100);
+
+        Assertions.assertEquals(2, telnetDubbo.get());
+    }
+
+    /**
+     * NOTE: This test case actually will fail, but the probability of this case is very small,
+     * and users should use telnet in new QOS port(default port is 22222) since dubbo 2.5.8,
+     * so we could ignore this problem.
+     *
+     * <p>
+     * telnet and telnet request
+     *
+     * <p>
+     * First ByteBuf (firstByteBuf):
+     * +--------------------------------------------------+
+     * |               telnet(incomplete)                 |
+     * +--------------------------------------------------+
+     * <p>
+     *
+     * Second ByteBuf (secondByteBuf):
+     * +--------------------------------------------------+
+     * |  telnet(the remaining)   |   telnet(complete)    |
+     * +--------------------------------------------------+
+     *
+     * @throws InterruptedException
+     */
+    // @Test
+    public void testTelnetTelnetDecoded() throws InterruptedException {
+        ByteBuf firstByteBuf = Unpooled.wrappedBuffer("ls\r".getBytes());
+        ByteBuf secondByteBuf = Unpooled.wrappedBuffer("\nls\r\n".getBytes());
+
+        EmbeddedChannel ch = null;
+        try {
+            Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+            URL url = new URL("dubbo", "localhost", 22226);
+            NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+            MockHandler mockHandler = new MockHandler((msg) -> {
+                if (checkTelnetDecoded(msg)) {
+                    telnetTelnet.incrementAndGet();
+                }
+            },
+                    new MultiMessageHandler(
+                            new DecodeHandler(
+                                    new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+                                        @Override
+                                        public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+                                            return null;
+                                        }
+                                    }))));
+
+            ch = new LocalEmbeddedChannel();
+            ch.pipeline()
+                    .addLast("decoder", adapter.getDecoder())
+                    .addLast("handler", mockHandler);
+
+            ch.writeInbound(firstByteBuf);
+            ch.writeInbound(secondByteBuf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (ch != null) {
+                ch.close().await(200, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        TimeUnit.MILLISECONDS.sleep(100);
+
+        Assertions.assertEquals(2, telnetTelnet.get());
+    }
+
+    /**
+     * dubbo and dubbo request
+     *
+     * <p>
+     * First ByteBuf (firstDubboByteBuf):
+     * ++-------------------------------------------------+
+     * ||               dubbo(incomplete)                 |
+     * ++-------------------------------------------------+
+     * ||
+     * Magic Code
+     * <p>
+     *
+     * <p>
+     * Second ByteBuf (secondDubboByteBuf):
+     * +-------------------------++-----------------------+
+     * |  dubbo(the remaining)   ||    dubbo(complete)    |
+     * +-------------------------++-----------------------+
+     *                           ||
+     *                       Magic Code
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testDubboDubboDecoded() throws InterruptedException, IOException {
+        ByteBuf dubboByteBuf = createDubboByteBuf();
+
+        ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50);
+        ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes() - 50);
+        ByteBuf secondDubboByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, dubboByteBuf);
+
+
+        EmbeddedChannel ch = null;
+        try {
+            Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+            URL url = new URL("dubbo", "localhost", 22226);
+            NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+            MockHandler mockHandler = new MockHandler(null,
+                    new MultiMessageHandler(
+                            new DecodeHandler(
+                                    new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+                                        @Override
+                                        public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+                                            if (checkDubboDecoded(msg)) {
+                                                dubboDubbo.incrementAndGet();
+                                            }
+                                            return null;
+                                        }
+                                    }))));
+
+            ch = new LocalEmbeddedChannel();
+            ch.pipeline()
+                    .addLast("decoder", adapter.getDecoder())
+                    .addLast("handler", mockHandler);
+
+            ch.writeInbound(firstDubboByteBuf);
+            ch.writeInbound(secondDubboByteBuf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (ch != null) {
+                ch.close().await(200, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        TimeUnit.MILLISECONDS.sleep(100);
+
+        Assertions.assertEquals(2, dubboDubbo.get());
+    }
+
+    /**
+     * dubbo and telnet request
+     *
+     * <p>
+     * First ByteBuf:
+     * ++-------------------------------------------------+
+     * ||               dubbo(incomplete)                 |
+     * ++-------------------------------------------------+
+     * ||
+     * Magic Code
+     *
+     * <p>
+     * Second ByteBuf:
+     * +--------------------------------------------------+
+     * |  dubbo(the remaining)  |     telnet(complete)    |
+     * +--------------------------------------------------+
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testDubboTelnetDecoded() throws InterruptedException, IOException {
+        ByteBuf dubboByteBuf = createDubboByteBuf();
+        ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50);
+        ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes());
+
+        ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("\r\n".getBytes());
+        ByteBuf secondByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, telnetByteBuf);
+
+        EmbeddedChannel ch = null;
+        try {
+            Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
+            URL url = new URL("dubbo", "localhost", 22226);
+            NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
+
+            MockHandler mockHandler = new MockHandler((msg) -> {
+                if (checkTelnetDecoded(msg)) {
+                    dubboTelnet.incrementAndGet();
+                }
+            },
+                    new MultiMessageHandler(
+                            new DecodeHandler(
+                                    new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
+                                        @Override
+                                        public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
+                                            if (checkDubboDecoded(msg)) {
+                                                dubboTelnet.incrementAndGet();
+                                            }
+                                            return null;
+                                        }
+                                    }))));
+
+            ch = new LocalEmbeddedChannel();
+            ch.pipeline()
+                    .addLast("decoder", adapter.getDecoder())
+                    .addLast("handler", mockHandler);
+
+            ch.writeInbound(firstDubboByteBuf);
+            ch.writeInbound(secondByteBuf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (ch != null) {
+                ch.close().await(200, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        TimeUnit.MILLISECONDS.sleep(100);
+
+        Assertions.assertEquals(2, dubboTelnet.get());
+    }
+
+    private ByteBuf createDubboByteBuf() throws IOException {
+        Request request = new Request();
+        RpcInvocation rpcInvocation = new RpcInvocation();
+        rpcInvocation.setMethodName("sayHello");
+        rpcInvocation.setParameterTypes(new Class[]{String.class});
+        rpcInvocation.setArguments(new String[]{"dubbo"});
+        rpcInvocation.setAttachment("path", DemoService.class.getName());
+        rpcInvocation.setAttachment("interface", DemoService.class.getName());
+        rpcInvocation.setAttachment("version", "0.0.0");
+
+        request.setData(rpcInvocation);
+        request.setVersion("2.0.2");
+
+        ByteBuf dubboByteBuf = Unpooled.buffer();
+        ChannelBuffer buffer = new NettyBackedChannelBuffer(dubboByteBuf);
+        DubboCodec dubboCodec = new DubboCodec();
+        dubboCodec.encode(new MockChannel(), buffer, request);
+
+        return dubboByteBuf;
+    }
+
+    private static boolean checkTelnetDecoded(Object msg) {
+        if (msg != null && msg instanceof String && !msg.toString().contains("Unsupported command:")) {
+            return true;
+        }
+        return false;
+    }
+
+    private static boolean checkDubboDecoded(Object msg) {
+        if (msg instanceof DecodeableRpcInvocation) {
+            DecodeableRpcInvocation invocation = (DecodeableRpcInvocation) msg;
+            if ("sayHello".equals(invocation.getMethodName())
+                    && invocation.getParameterTypes().length == 1
+                    && String.class.equals(invocation.getParameterTypes()[0])
+                    && invocation.getArguments().length == 1
+                    && "dubbo".equals(invocation.getArguments()[0])
+                    && DemoService.class.getName().equals(invocation.getAttachment("path"))
+                    && DemoService.class.getName().equals(invocation.getAttachment("interface"))
+                    && "0.0.0".equals(invocation.getAttachment("version"))) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java
new file mode 100644
index 0000000..adc590d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.utils.NetUtils;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class LocalEmbeddedChannel extends EmbeddedChannel {
+    public SocketAddress localAddress() {
+        return new InetSocketAddress(20883);
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+        return new InetSocketAddress(NetUtils.getAvailablePort());
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java
new file mode 100644
index 0000000..80a2dc5
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import java.net.InetSocketAddress;
+import java.util.function.Consumer;
+
+public class MockChannel implements Channel {
+    private Consumer consumer;
+
+    public MockChannel() {
+
+    }
+
+    public MockChannel(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return new InetSocketAddress(NetUtils.getAvailablePort());
+    }
+
+    @Override
+    public boolean isConnected() {
+        return false;
+    }
+
+    @Override
+    public boolean hasAttribute(String key) {
+        return false;
+    }
+
+    @Override
+    public Object getAttribute(String key) {
+        return null;
+    }
+
+    @Override
+    public void setAttribute(String key, Object value) {
+
+    }
+
+    @Override
+    public void removeAttribute(String key) {
+
+    }
+
+    @Override
+    public URL getUrl() {
+        return new URL("dubbo", "localhost", 20880);
+    }
+
+    @Override
+    public ChannelHandler getChannelHandler() {
+        return null;
+    }
+
+    @Override
+    public InetSocketAddress getLocalAddress() {
+        return new InetSocketAddress(20883);
+    }
+
+    @Override
+    public void send(Object message) throws RemotingException {
+        if (consumer != null) {
+            consumer.accept(message);
+        }
+    }
+
+    @Override
+    public void send(Object message, boolean sent) throws RemotingException {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void close(int timeout) {
+
+    }
+
+    @Override
+    public void startClose() {
+
+    }
+
+    @Override
+    public boolean isClosed() {
+        return false;
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java
new file mode 100644
index 0000000..4929297
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class MockChannelHandler implements ChannelHandler {
+    //    ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<String, Channel>();
+    ConcurrentHashSet<Channel> channels = new ConcurrentHashSet<Channel>();
+
+    @Override
+    public void connected(Channel channel) throws RemotingException {
+        channels.add(channel);
+    }
+
+    @Override
+    public void disconnected(Channel channel) throws RemotingException {
+        channels.remove(channel);
+    }
+
+    @Override
+    public void sent(Channel channel, Object message) throws RemotingException {
+        channel.send(message);
+    }
+
+    @Override
+    public void received(Channel channel, Object message) throws RemotingException {
+        //echo 
+        channel.send(message);
+    }
+
+    @Override
+    public void caught(Channel channel, Throwable exception) throws RemotingException {
+        throw new RemotingException(channel, exception);
+
+    }
+
+    public Set<Channel> getChannels() {
+        return Collections.unmodifiableSet(channels);
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java
new file mode 100644
index 0000000..f4f432a
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.decode;
+
+import org.apache.dubbo.remoting.ChannelHandler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.util.function.Consumer;
+
+public class MockHandler extends ChannelDuplexHandler {
+    private final Consumer consumer;
+
+    private final ChannelHandler handler;
+
+    public MockHandler(Consumer consumer, ChannelHandler handler) {
+        this.consumer = consumer;
+        this.handler = handler;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        this.handler.received(new MockChannel(consumer), msg);
+    }
+}