You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/03/07 10:13:01 UTC

[incubator-dubbo] branch 3.x-dev updated: Merge pull request #3609, introduce rx support.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.x-dev
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/3.x-dev by this push:
     new 020697b  Merge pull request #3609, introduce rx support.
020697b is described below

commit 020697b0a0eb19dc0cafeb84012a4c25b17c85e6
Author: uglycow <xi...@gmail.com>
AuthorDate: Thu Mar 7 18:12:55 2019 +0800

    Merge pull request #3609, introduce rx support.
---
 dubbo-rpc/dubbo-rpc-rsocket/pom.xml                | 123 +++++
 .../rpc/protocol/rsocket/FutureSubscriber.java     |  97 ++++
 .../dubbo/rpc/protocol/rsocket/MetadataCodec.java  |  39 ++
 .../rpc/protocol/rsocket/RSocketConstants.java     |  35 ++
 .../rpc/protocol/rsocket/RSocketExporter.java      |  46 ++
 .../dubbo/rpc/protocol/rsocket/RSocketInvoker.java | 251 ++++++++++
 .../rpc/protocol/rsocket/RSocketProtocol.java      | 533 +++++++++++++++++++++
 .../dubbo/internal/org.apache.dubbo.rpc.Protocol   |   1 +
 .../dubbo/rpc/protocol/rsocket/ConsumerDemo.java   |  51 ++
 .../dubbo/rpc/protocol/rsocket/ProviderDemo.java   |  29 ++
 .../rpc/protocol/rsocket/RSocketProtocolTest.java  | 219 +++++++++
 .../apache/dubbo/rpc/service/DemoException.java    |  40 ++
 .../org/apache/dubbo/rpc/service/DemoService.java  |  65 +++
 .../apache/dubbo/rpc/service/DemoServiceImpl.java  | 153 ++++++
 .../apache/dubbo/rpc/service/RemoteService.java    |  23 +
 .../dubbo/rpc/service/RemoteServiceImpl.java       |  26 +
 .../dubbo-rpc-rsocket/src/test/resources/log4j.xml |  46 ++
 .../resources/spring/dubbo-rsocket-consumer.xml    |  36 ++
 .../resources/spring/dubbo-rsocket-provider.xml    |  40 ++
 dubbo-rpc/pom.xml                                  |   1 +
 20 files changed, 1854 insertions(+)

diff --git a/dubbo-rpc/dubbo-rpc-rsocket/pom.xml b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml
new file mode 100644
index 0000000..b38f74d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml
@@ -0,0 +1,123 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dubbo</groupId>
+        <artifactId>dubbo-rpc</artifactId>
+        <version>2.7.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>dubbo-rpc-rsocket</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The default rpc module of dubbo project</description>
+    <properties>
+        <skip_maven_deploy>false</skip_maven_deploy>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+            <version>4.3.16.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-multicast</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.rsocket</groupId>
+            <artifactId>rsocket-core</artifactId>
+            <version>0.11.14</version>
+        </dependency>
+        <dependency>
+            <groupId>io.rsocket</groupId>
+            <artifactId>rsocket-transport-netty</artifactId>
+            <version>0.11.14</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.54</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-rpc-api</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-api</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-config-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-config-spring</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-container-api</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-servlet</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-hessian2</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-jdk</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!--<dependency>-->
+        <!--<groupId>javax.validation</groupId>-->
+        <!--<artifactId>validation-api</artifactId>-->
+        <!--<scope>test</scope>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+        <!--<groupId>org.hibernate</groupId>-->
+        <!--<artifactId>hibernate-validator</artifactId>-->
+        <!--<scope>test</scope>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+        <!--<groupId>org.glassfish</groupId>-->
+        <!--<artifactId>javax.el</artifactId>-->
+        <!--<scope>test</scope>-->
+        <!--</dependency>-->
+    </dependencies>
+</project>
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java
new file mode 100644
index 0000000..8f12ba8
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.Payload;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.rpc.RpcResult;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class FutureSubscriber extends CompletableFuture<RpcResult> implements Subscriber<Payload> {
+
+    private final Serialization serialization;
+
+    private final Class<?> retType;
+
+    public FutureSubscriber(Serialization serialization, Class<?> retType) {
+        this.serialization = serialization;
+        this.retType = retType;
+    }
+
+
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        subscription.request(1);
+    }
+
+    @Override
+    public void onNext(Payload payload) {
+        try {
+            RpcResult rpcResult = new RpcResult();
+            ByteBuffer dataBuffer = payload.getData();
+            byte[] dataBytes = new byte[dataBuffer.remaining()];
+            dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+            InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+            ObjectInput in = serialization.deserialize(null, dataInputStream);
+
+            int flag = in.readByte();
+            if ((flag & RSocketConstants.FLAG_ERROR) != 0) {
+                Throwable t = (Throwable) in.readObject();
+                rpcResult.setException(t);
+            } else {
+                Object value = null;
+                if ((flag & RSocketConstants.FLAG_NULL_VALUE) == 0) {
+                    if (retType == null) {
+                        value = in.readObject();
+                    } else {
+                        value = in.readObject(retType);
+                    }
+                    rpcResult.setValue(value);
+                }
+            }
+
+            if ((flag & RSocketConstants.FLAG_HAS_ATTACHMENT) != 0) {
+                Map<String, String> attachment = in.readObject(Map.class);
+                rpcResult.setAttachments(attachment);
+
+            }
+
+            this.complete(rpcResult);
+
+
+        } catch (Throwable t) {
+            this.completeExceptionally(t);
+        }
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        this.completeExceptionally(throwable);
+    }
+
+    @Override
+    public void onComplete() {
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java
new file mode 100644
index 0000000..ce2e1b5
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import com.alibaba.fastjson.JSON;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * @author sixie.xyn on 2019/1/3.
+ */
+public class MetadataCodec {
+
+    public static Map<String, Object> decodeMetadata(byte[] bytes) throws IOException {
+        return JSON.parseObject(new String(bytes, StandardCharsets.UTF_8), Map.class);
+    }
+
+    public static byte[] encodeMetadata(Map<String, Object> metadata) throws IOException {
+        String jsonStr = JSON.toJSONString(metadata);
+        return jsonStr.getBytes(StandardCharsets.UTF_8);
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java
new file mode 100644
index 0000000..e6ad98a
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+/**
+ * @author sixie.xyn on 2019/1/3.
+ */
+public class RSocketConstants {
+
+    public static final String SERVICE_NAME_KEY = "_service_name";
+    public static final String SERVICE_VERSION_KEY = "_service_version";
+    public static final String METHOD_NAME_KEY = "_method_name";
+    public static final String PARAM_TYPE_KEY = "_param_type";
+    public static final String SERIALIZE_TYPE_KEY = "_serialize_type";
+    public static final String TIMEOUT_KEY = "_timeout";
+
+
+    public static final int FLAG_ERROR = 0x01;
+    public static final int FLAG_NULL_VALUE = 0x02;
+    public static final int FLAG_HAS_ATTACHMENT = 0x04;
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java
new file mode 100644
index 0000000..074085e
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.protocol.AbstractExporter;
+
+import java.util.Map;
+
+/**
+ * @author sixie.xyn on 2019/1/2.
+ */
+public class RSocketExporter<T> extends AbstractExporter<T> {
+
+    private final String key;
+
+    private final Map<String, Exporter<?>> exporterMap;
+
+    public RSocketExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
+        super(invoker);
+        this.key = key;
+        this.exporterMap = exporterMap;
+    }
+
+    @Override
+    public void unexport() {
+        super.unexport();
+        exporterMap.remove(key);
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java
new file mode 100644
index 0000000..bc5cf43
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.util.DefaultPayload;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.serialize.Cleanable;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.utils.AtomicPositiveInteger;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.RpcResult;
+import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+import org.apache.dubbo.rpc.support.RpcUtils;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+/**
+ * @author sixie.xyn on 2019/1/2.
+ */
+public class RSocketInvoker<T> extends AbstractInvoker<T> {
+
+    private final RSocket[] clients;
+
+    private final AtomicPositiveInteger index = new AtomicPositiveInteger();
+
+    private final String version;
+
+    private final ReentrantLock destroyLock = new ReentrantLock();
+
+    private final Set<Invoker<?>> invokers;
+
+    private final Serialization serialization;
+
+    public RSocketInvoker(Class<T> serviceType, URL url, RSocket[] clients, Set<Invoker<?>> invokers) {
+        super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
+        this.clients = clients;
+        // get version.
+        this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
+        this.invokers = invokers;
+
+        this.serialization = CodecSupport.getSerialization(getUrl());
+    }
+
+    @Override
+    protected Result doInvoke(final Invocation invocation) throws Throwable {
+        RpcInvocation inv = (RpcInvocation) invocation;
+        final String methodName = RpcUtils.getMethodName(invocation);
+        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
+        inv.setAttachment(Constants.VERSION_KEY, version);
+
+        RSocket currentClient;
+        if (clients.length == 1) {
+            currentClient = clients[0];
+        } else {
+            currentClient = clients[index.getAndIncrement() % clients.length];
+        }
+        try {
+            //TODO support timeout
+            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
+
+            RpcContext.getContext().setFuture(null);
+            //encode inv: metadata and data(arg,attachment)
+            Payload requestPayload = encodeInvocation(invocation);
+
+            Class<?> retType = RpcUtils.getReturnType(invocation);
+
+            if (retType != null && retType.isAssignableFrom(Mono.class)) {
+                Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
+                Mono<Object> bizMono = responseMono.map(new Function<Payload, Object>() {
+                    @Override
+                    public Object apply(Payload payload) {
+                        return decodeData(payload);
+                    }
+                });
+                RpcResult rpcResult = new RpcResult();
+                rpcResult.setValue(bizMono);
+                return rpcResult;
+            } else if (retType != null && retType.isAssignableFrom(Flux.class)) {
+                return requestStream(currentClient, requestPayload);
+            } else {
+                //request-reponse
+                Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
+                FutureSubscriber futureSubscriber = new FutureSubscriber(serialization, retType);
+                responseMono.subscribe(futureSubscriber);
+                return (Result) futureSubscriber.get();
+            }
+
+            //TODO support stream arg
+        } catch (Throwable t) {
+            throw new RpcException(t);
+        }
+    }
+
+
+    private Result requestStream(RSocket currentClient, Payload requestPayload) {
+        Flux<Payload> responseFlux = currentClient.requestStream(requestPayload);
+        Flux<Object> retFlux = responseFlux.map(new Function<Payload, Object>() {
+
+            @Override
+            public Object apply(Payload payload) {
+                return decodeData(payload);
+            }
+        });
+
+        RpcResult rpcResult = new RpcResult();
+        rpcResult.setValue(retFlux);
+        return rpcResult;
+    }
+
+
+    private Object decodeData(Payload payload) {
+        try {
+            //TODO save the copy
+            ByteBuffer dataBuffer = payload.getData();
+            byte[] dataBytes = new byte[dataBuffer.remaining()];
+            dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+            InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+            ObjectInput in = serialization.deserialize(null, dataInputStream);
+            int flag = in.readByte();
+            if ((flag & RSocketConstants.FLAG_ERROR) != 0) {
+                Throwable t = (Throwable) in.readObject();
+                throw t;
+            } else {
+                return in.readObject();
+            }
+        } catch (Throwable t) {
+            throw Exceptions.propagate(t);
+        }
+    }
+
+    @Override
+    public boolean isAvailable() {
+        if (!super.isAvailable()) {
+            return false;
+        }
+        for (RSocket client : clients) {
+            if (client.availability() > 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void destroy() {
+        // in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every
+        // time when client.close() is called, counter counts down once, and when counter reaches zero, client will be
+        // closed.
+        if (super.isDestroyed()) {
+            return;
+        } else {
+            // double check to avoid dup close
+            destroyLock.lock();
+            try {
+                if (super.isDestroyed()) {
+                    return;
+                }
+                super.destroy();
+                if (invokers != null) {
+                    invokers.remove(this);
+                }
+                for (RSocket client : clients) {
+                    try {
+                        client.dispose();
+                    } catch (Throwable t) {
+                        logger.warn(t.getMessage(), t);
+                    }
+                }
+
+            } finally {
+                destroyLock.unlock();
+            }
+        }
+    }
+
+    private Payload encodeInvocation(Invocation invocation) throws IOException {
+        byte[] metadata = encodeMetadata(invocation);
+        byte[] data = encodeData(invocation);
+        return DefaultPayload.create(data, metadata);
+    }
+
+    private byte[] encodeMetadata(Invocation invocation) throws IOException {
+        Map<String, Object> metadataMap = new HashMap<String, Object>();
+        metadataMap.put(RSocketConstants.SERVICE_NAME_KEY, invocation.getAttachment(Constants.PATH_KEY));
+        metadataMap.put(RSocketConstants.SERVICE_VERSION_KEY, invocation.getAttachment(Constants.VERSION_KEY));
+        metadataMap.put(RSocketConstants.METHOD_NAME_KEY, invocation.getMethodName());
+        metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(invocation.getParameterTypes()));
+        metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, (Byte) serialization.getContentTypeId());
+        return MetadataCodec.encodeMetadata(metadataMap);
+    }
+
+
+    private byte[] encodeData(Invocation invocation) throws IOException {
+        ByteArrayOutputStream dataOutputStream = new ByteArrayOutputStream();
+        Serialization serialization = CodecSupport.getSerialization(getUrl());
+        ObjectOutput out = serialization.serialize(getUrl(), dataOutputStream);
+        RpcInvocation inv = (RpcInvocation) invocation;
+        Object[] args = inv.getArguments();
+        if (args != null) {
+            for (int i = 0; i < args.length; i++) {
+                out.writeObject(args[i]);
+            }
+        }
+        out.writeObject(RpcUtils.getNecessaryAttachments(inv));
+
+        //clean
+        out.flushBuffer();
+        if (out instanceof Cleanable) {
+            ((Cleanable) out).cleanup();
+        }
+        return dataOutputStream.toByteArray();
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java
new file mode 100644
index 0000000..6d480d0
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.AbstractRSocket;
+import io.rsocket.ConnectionSetupPayload;
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.RSocketFactory;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.transport.netty.server.CloseableChannel;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.DefaultPayload;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.protocol.AbstractProtocol;
+import org.apache.dubbo.rpc.support.RpcUtils;
+import org.reactivestreams.Publisher;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+/**
+ * @author sixie.xyn on 2019/1/2.
+ */
+public class RSocketProtocol extends AbstractProtocol {
+
+    public static final String NAME = "rsocket";
+    public static final int DEFAULT_PORT = 30880;
+    private static final Logger log = LoggerFactory.getLogger(RSocketProtocol.class);
+    private static RSocketProtocol INSTANCE;
+
+    // <host:port,CloseableChannel>
+    private final Map<String, CloseableChannel> serverMap = new ConcurrentHashMap<String, CloseableChannel>();
+
+    // <host:port,RSocket>
+    private final Map<String, RSocket> referenceClientMap = new ConcurrentHashMap<String, RSocket>();
+
+    private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
+
+    public RSocketProtocol() {
+        INSTANCE = this;
+    }
+
+    public static RSocketProtocol getRSocketProtocol() {
+        if (INSTANCE == null) {
+            ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(RSocketProtocol.NAME); // load
+        }
+        return INSTANCE;
+    }
+
+    public Collection<Exporter<?>> getExporters() {
+        return Collections.unmodifiableCollection(exporterMap.values());
+    }
+
+    Map<String, Exporter<?>> getExporterMap() {
+        return exporterMap;
+    }
+
+    Invoker<?> getInvoker(int port, Map<String, Object> metadataMap) throws RemotingException {
+        String path = (String) metadataMap.get(RSocketConstants.SERVICE_NAME_KEY);
+        String serviceKey = serviceKey(port, path, (String) metadataMap.get(RSocketConstants.SERVICE_VERSION_KEY), (String) metadataMap.get(Constants.GROUP_KEY));
+        RSocketExporter<?> exporter = (RSocketExporter<?>) exporterMap.get(serviceKey);
+        if (exporter == null) {
+            //throw new Throwable("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
+            throw new RuntimeException("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch ");
+        }
+
+        return exporter.getInvoker();
+    }
+
+    public Collection<Invoker<?>> getInvokers() {
+        return Collections.unmodifiableCollection(invokers);
+    }
+
+    @Override
+    public int getDefaultPort() {
+        return DEFAULT_PORT;
+    }
+
+    @Override
+    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
+        URL url = invoker.getUrl();
+
+        // export service.
+        String key = serviceKey(url);
+        RSocketExporter<T> exporter = new RSocketExporter<T>(invoker, key, exporterMap);
+        exporterMap.put(key, exporter);
+
+        openServer(url);
+        return exporter;
+    }
+
+    private void openServer(URL url) {
+        String key = url.getAddress();
+        //client can export a service which's only for server to invoke
+        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
+        if (isServer) {
+            CloseableChannel server = serverMap.get(key);
+            if (server == null) {
+                synchronized (this) {
+                    server = serverMap.get(key);
+                    if (server == null) {
+                        serverMap.put(key, createServer(url));
+                    }
+                }
+            }
+        }
+    }
+
+    private CloseableChannel createServer(URL url) {
+        try {
+            String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
+            int bindPort = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
+            if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
+                bindIp = NetUtils.ANYHOST;
+            }
+            return RSocketFactory.receive()
+                    .acceptor(new SocketAcceptorImpl(bindPort))
+                    .transport(TcpServerTransport.create(bindIp, bindPort))
+                    .start()
+                    .block();
+        } catch (Throwable e) {
+            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
+        }
+    }
+
+
+    @Override
+    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
+        // create rpc invoker.
+        RSocketInvoker<T> invoker = new RSocketInvoker<T>(serviceType, url, getClients(url), invokers);
+        invokers.add(invoker);
+        return invoker;
+    }
+
+    private RSocket[] getClients(URL url) {
+        // whether to share connection
+        boolean service_share_connect = false;
+        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
+        // if not configured, connection is shared, otherwise, one connection for one service
+        if (connections == 0) {
+            service_share_connect = true;
+            connections = 1;
+        }
+
+        RSocket[] clients = new RSocket[connections];
+        for (int i = 0; i < clients.length; i++) {
+            if (service_share_connect) {
+                clients[i] = getSharedClient(url);
+            } else {
+                clients[i] = initClient(url);
+            }
+        }
+        return clients;
+    }
+
+    /**
+     * Get shared connection
+     */
+    private RSocket getSharedClient(URL url) {
+        String key = url.getAddress();
+        RSocket client = referenceClientMap.get(key);
+        if (client != null) {
+            return client;
+        }
+
+        locks.putIfAbsent(key, new Object());
+        synchronized (locks.get(key)) {
+            if (referenceClientMap.containsKey(key)) {
+                return referenceClientMap.get(key);
+            }
+
+            client = initClient(url);
+            referenceClientMap.put(key, client);
+            locks.remove(key);
+            return client;
+        }
+    }
+
+    /**
+     * Create new connection
+     */
+    private RSocket initClient(URL url) {
+        try {
+            InetSocketAddress serverAddress = new InetSocketAddress(NetUtils.filterLocalHost(url.getHost()), url.getPort());
+            RSocket client = RSocketFactory.connect().keepAliveTickPeriod(Duration.ZERO).keepAliveAckTimeout(Duration.ZERO).acceptor(
+                    rSocket ->
+                            new AbstractRSocket() {
+                                public Mono<Payload> requestResponse(Payload payload) {
+                                    //TODO support Mono arg
+                                    throw new UnsupportedOperationException();
+                                }
+
+                                @Override
+                                public Flux<Payload> requestStream(Payload payload) {
+                                    //TODO support Flux arg
+                                    throw new UnsupportedOperationException();
+                                }
+                            })
+                    .transport(TcpClientTransport.create(serverAddress))
+                    .start()
+                    .block();
+            return client;
+        } catch (Throwable e) {
+            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    public void destroy() {
+        for (String key : new ArrayList<String>(serverMap.keySet())) {
+            CloseableChannel server = serverMap.remove(key);
+            if (server != null) {
+                try {
+                    if (logger.isInfoEnabled()) {
+                        logger.info("Close dubbo server: " + server.address());
+                    }
+                    server.dispose();
+                } catch (Throwable t) {
+                    logger.warn(t.getMessage(), t);
+                }
+            }
+        }
+
+        for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
+            RSocket client = referenceClientMap.remove(key);
+            if (client != null) {
+                try {
+//                    if (logger.isInfoEnabled()) {
+//                        logger.info("Close dubbo connect: " + client. + "-->" + client.getRemoteAddress());
+//                    }
+                    client.dispose();
+                } catch (Throwable t) {
+                    logger.warn(t.getMessage(), t);
+                }
+            }
+        }
+        super.destroy();
+    }
+
+
+    //server process logic
+    private class SocketAcceptorImpl implements SocketAcceptor {
+
+        private final int port;
+
+        public SocketAcceptorImpl(int port) {
+            this.port = port;
+        }
+
+        @Override
+        public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
+            return Mono.just(
+                    new AbstractRSocket() {
+                        public Mono<Payload> requestResponse(Payload payload) {
+                            try {
+                                Map<String, Object> metadata = decodeMetadata(payload);
+                                Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
+                                Invocation inv = decodeInvocation(payload, metadata, serializeId);
+
+                                Result result = inv.getInvoker().invoke(inv);
+
+                                Class<?> retType = RpcUtils.getReturnType(inv);
+                                //ok
+                                if (retType != null && Mono.class.isAssignableFrom(retType)) {
+                                    Throwable th = result.getException();
+                                    if (th == null) {
+                                        Mono bizMono = (Mono) result.getValue();
+                                        Mono<Payload> retMono = bizMono.map(new Function<Object, Payload>() {
+                                            @Override
+                                            public Payload apply(Object o) {
+                                                try {
+                                                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                                                    ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+                                                    out.writeByte((byte) 0);
+                                                    out.writeObject(o);
+                                                    out.flushBuffer();
+                                                    bos.flush();
+                                                    bos.close();
+                                                    Payload responsePayload = DefaultPayload.create(bos.toByteArray());
+                                                    return responsePayload;
+                                                } catch (Throwable t) {
+                                                    throw Exceptions.propagate(t);
+                                                }
+                                            }
+                                        }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
+                                            @Override
+                                            public Publisher<Payload> apply(Throwable throwable) {
+                                                try {
+                                                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                                                    ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+                                                    out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+                                                    out.writeObject(throwable);
+                                                    out.flushBuffer();
+                                                    bos.flush();
+                                                    bos.close();
+                                                    Payload errorPayload = DefaultPayload.create(bos.toByteArray());
+                                                    return Flux.just(errorPayload);
+                                                } catch (Throwable t) {
+                                                    throw Exceptions.propagate(t);
+                                                }
+                                            }
+                                        });
+
+                                        return retMono;
+                                    } else {
+                                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                                        ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+                                        out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+                                        out.writeObject(th);
+                                        out.flushBuffer();
+                                        bos.flush();
+                                        bos.close();
+                                        Payload errorPayload = DefaultPayload.create(bos.toByteArray());
+                                        return Mono.just(errorPayload);
+                                    }
+
+                                } else {
+                                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                                    ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+                                    int flag = RSocketConstants.FLAG_HAS_ATTACHMENT;
+
+                                    Throwable th = result.getException();
+                                    if (th == null) {
+                                        Object ret = result.getValue();
+                                        if (ret == null) {
+                                            flag |= RSocketConstants.FLAG_NULL_VALUE;
+                                            out.writeByte((byte) flag);
+                                        } else {
+                                            out.writeByte((byte) flag);
+                                            out.writeObject(ret);
+                                        }
+                                    } else {
+                                        flag |= RSocketConstants.FLAG_ERROR;
+                                        out.writeByte((byte) flag);
+                                        out.writeObject(th);
+                                    }
+                                    out.writeObject(result.getAttachments());
+                                    out.flushBuffer();
+                                    bos.flush();
+                                    bos.close();
+
+                                    Payload responsePayload = DefaultPayload.create(bos.toByteArray());
+                                    return Mono.just(responsePayload);
+                                }
+                            } catch (Throwable t) {
+                                //application error
+                                return Mono.error(t);
+                            } finally {
+                                payload.release();
+                            }
+                        }
+
+                        public Flux<Payload> requestStream(Payload payload) {
+                            try {
+                                Map<String, Object> metadata = decodeMetadata(payload);
+                                Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
+                                Invocation inv = decodeInvocation(payload, metadata, serializeId);
+
+                                Result result = inv.getInvoker().invoke(inv);
+                                //Class<?> retType = RpcUtils.getReturnType(inv);
+
+                                Throwable th = result.getException();
+                                if (th != null) {
+                                    Payload errorPayload = encodeError(th, serializeId);
+                                    return Flux.just(errorPayload);
+                                }
+
+                                Flux flux = (Flux) result.getValue();
+                                Flux<Payload> retFlux = flux.map(new Function<Object, Payload>() {
+                                    @Override
+                                    public Payload apply(Object o) {
+                                        try {
+                                            return encodeData(o, serializeId);
+                                        } catch (Throwable t) {
+                                            throw new RuntimeException(t);
+                                        }
+                                    }
+                                }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
+                                    @Override
+                                    public Publisher<Payload> apply(Throwable throwable) {
+                                        try {
+                                            Payload errorPayload = encodeError(throwable, serializeId);
+                                            return Flux.just(errorPayload);
+                                        } catch (Throwable t) {
+                                            throw new RuntimeException(t);
+                                        }
+                                    }
+                                });
+                                return retFlux;
+                            } catch (Throwable t) {
+                                return Flux.error(t);
+                            } finally {
+                                payload.release();
+                            }
+                        }
+
+                        private Payload encodeData(Object data, byte serializeId) throws Throwable {
+                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                            ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+                            out.writeByte((byte) 0);
+                            out.writeObject(data);
+                            out.flushBuffer();
+                            bos.flush();
+                            bos.close();
+                            return DefaultPayload.create(bos.toByteArray());
+                        }
+
+                        private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable {
+                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                            ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+                            out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+                            out.writeObject(throwable);
+                            out.flushBuffer();
+                            bos.flush();
+                            bos.close();
+                            return DefaultPayload.create(bos.toByteArray());
+                        }
+
+                        private Map<String, Object> decodeMetadata(Payload payload) throws IOException {
+                            ByteBuffer metadataBuffer = payload.getMetadata();
+                            byte[] metadataBytes = new byte[metadataBuffer.remaining()];
+                            metadataBuffer.get(metadataBytes, metadataBuffer.position(), metadataBuffer.remaining());
+                            return MetadataCodec.decodeMetadata(metadataBytes);
+                        }
+
+                        private Invocation decodeInvocation(Payload payload, Map<String, Object> metadata, Byte serializeId) throws RemotingException, IOException, ClassNotFoundException {
+                            Invoker<?> invoker = getInvoker(port, metadata);
+
+                            String serviceName = (String) metadata.get(RSocketConstants.SERVICE_NAME_KEY);
+                            String version = (String) metadata.get(RSocketConstants.SERVICE_VERSION_KEY);
+                            String methodName = (String) metadata.get(RSocketConstants.METHOD_NAME_KEY);
+                            String paramType = (String) metadata.get(RSocketConstants.PARAM_TYPE_KEY);
+
+                            ByteBuffer dataBuffer = payload.getData();
+                            byte[] dataBytes = new byte[dataBuffer.remaining()];
+                            dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+
+
+                            //TODO how to get remote address
+                            //RpcContext rpcContext = RpcContext.getContext();
+                            //rpcContext.setRemoteAddress(channel.getRemoteAddress());
+
+
+                            RpcInvocation inv = new RpcInvocation();
+                            inv.setInvoker(invoker);
+                            inv.setAttachment(Constants.PATH_KEY, serviceName);
+                            inv.setAttachment(Constants.VERSION_KEY, version);
+                            inv.setMethodName(methodName);
+
+
+                            InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+                            ObjectInput in = CodecSupport.getSerializationById(serializeId).deserialize(null, dataInputStream);
+
+                            Object[] args;
+                            Class<?>[] pts;
+                            String desc = paramType;
+                            if (desc.length() == 0) {
+                                pts = new Class<?>[0];
+                                args = new Object[0];
+                            } else {
+                                pts = ReflectUtils.desc2classArray(desc);
+                                args = new Object[pts.length];
+                                for (int i = 0; i < args.length; i++) {
+                                    try {
+                                        args[i] = in.readObject(pts[i]);
+                                    } catch (Exception e) {
+                                        if (log.isWarnEnabled()) {
+                                            log.warn("Decode argument failed: " + e.getMessage(), e);
+                                        }
+                                    }
+                                }
+                            }
+                            inv.setParameterTypes(pts);
+                            inv.setArguments(args);
+                            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
+                            if (map != null && map.size() > 0) {
+                                inv.addAttachments(map);
+                            }
+                            return inv;
+                        }
+                    });
+        }
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol b/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
new file mode 100644
index 0000000..4f03810
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
@@ -0,0 +1 @@
+rsocket=org.apache.dubbo.rpc.protocol.rsocket.RSocketProtocol
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java
new file mode 100644
index 0000000..d73bbae
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.apache.dubbo.rpc.service.DemoService;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Consumer;
+
+public class ConsumerDemo {
+
+    public static void main(String[] args) {
+        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-rsocket-consumer.xml"});
+        context.start();
+        DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
+
+        while (true) {
+            try {
+                Thread.sleep(1000);
+                Mono<String> resultMono = demoService.requestMono("world"); // call remote method
+                resultMono.doOnNext(new Consumer<String>() {
+                    @Override
+                    public void accept(String s) {
+                        System.out.println(s); // get result
+                    }
+                }).block();
+            } catch (Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+
+        }
+
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java
new file mode 100644
index 0000000..2e7466d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class ProviderDemo {
+
+    public static void main(String[] args) throws Exception {
+        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-rsocket-provider.xml"});
+        context.start();
+        System.in.read(); // press any key to exit
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java
new file mode 100644
index 0000000..e34a6f7
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.ProxyFactory;
+import org.apache.dubbo.rpc.service.DemoException;
+import org.apache.dubbo.rpc.service.DemoService;
+import org.apache.dubbo.rpc.service.DemoServiceImpl;
+import org.apache.dubbo.rpc.service.EchoService;
+import org.apache.dubbo.rpc.service.RemoteService;
+import org.apache.dubbo.rpc.service.RemoteServiceImpl;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+public class RSocketProtocolTest {
+
+    private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
+    private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
+
+    @AfterClass
+    public static void after() {
+        RSocketProtocol.getRSocketProtocol().destroy();
+    }
+
+    @Test
+    public void testDemoProtocol() throws Exception {
+        DemoService service = new DemoServiceImpl();
+        protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName())));
+        service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+        assertEquals(service.getSize(new String[]{"", "", ""}), 3);
+    }
+
+    @Test
+    public void testDubboProtocol() throws Exception {
+        DemoService service = new DemoServiceImpl();
+        protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName())));
+        service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+
+        assertEquals(service.getSize(null), -1);
+        assertEquals(service.getSize(new String[]{"", "", ""}), 3);
+
+
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("aa", "bb");
+        Set<String> set = service.keys(map);
+        assertEquals(set.size(), 1);
+        assertEquals(set.iterator().next(), "aa");
+        service.invoke("rsocket://127.0.0.1:9010/" + DemoService.class.getName() + "", "invoke");
+
+        StringBuffer buf = new StringBuffer();
+        for (int i = 0; i < 1024 * 32 + 32; i++)
+            buf.append('A');
+        System.out.println(service.stringLength(buf.toString()));
+
+        // cast to EchoService
+        EchoService echo = proxy.getProxy(protocol.refer(EchoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+        assertEquals(echo.$echo(buf.toString()), buf.toString());
+        assertEquals(echo.$echo("test"), "test");
+        assertEquals(echo.$echo("abcdefg"), "abcdefg");
+        assertEquals(echo.$echo(1234), 1234);
+    }
+
+
+    @Test
+    public void testDubboProtocolThrowable() throws Exception {
+        DemoService service = new DemoServiceImpl();
+        protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName())));
+        service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+        try {
+            service.errorTest("mike");
+        } catch (Throwable t) {
+            assertEquals(t.getClass(), ArithmeticException.class);
+        }
+    }
+
+    @Test
+    public void testDubboProtocolMultiService() throws Exception {
+        DemoService service = new DemoServiceImpl();
+        protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName())));
+        service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+
+        RemoteService remote = new RemoteServiceImpl();
+        protocol.export(proxy.getInvoker(remote, RemoteService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + RemoteService.class.getName())));
+        remote = proxy.getProxy(protocol.refer(RemoteService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + RemoteService.class.getName()).addParameter("timeout", 3000l)));
+
+        service.sayHello("world");
+
+        // test netty client
+        assertEquals("world", service.echo("world"));
+        assertEquals("hello world", remote.sayHello("world"));
+
+        EchoService serviceEcho = (EchoService) service;
+        assertEquals(serviceEcho.$echo("test"), "test");
+
+        EchoService remoteEecho = (EchoService) remote;
+        assertEquals(remoteEecho.$echo("ok"), "ok");
+    }
+
+
+    @Test
+    public void testRequestMono() throws Exception {
+        DemoService service = new DemoServiceImpl();
+        protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName())));
+        service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+        Mono<String> result = service.requestMono("mike");
+
+        result.doOnNext(new Consumer<String>() {
+            @Override
+            public void accept(String s) {
+                assertEquals(s, "hello mike");
+                System.out.println(s);
+            }
+        }).block();
+
+        Mono<String> result2 = service.requestMonoOnError("mike");
+        result2.onErrorResume(DemoException.class, new Function<DemoException, Mono<String>>() {
+            @Override
+            public Mono<String> apply(DemoException e) {
+                return Mono.just(e.getClass().getName());
+            }
+        }).doOnNext(new Consumer<String>() {
+            @Override
+            public void accept(String s) {
+                assertEquals(DemoException.class.getName(), s);
+            }
+        }).block();
+
+        Mono<String> result3 = service.requestMonoBizError("mike");
+        result3.onErrorResume(ArithmeticException.class, new Function<ArithmeticException, Mono<String>>() {
+            @Override
+            public Mono<String> apply(ArithmeticException e) {
+                return Mono.just(e.getClass().getName());
+            }
+        }).doOnNext(new Consumer<String>() {
+            @Override
+            public void accept(String s) {
+                assertEquals(ArithmeticException.class.getName(), s);
+            }
+        }).block();
+
+    }
+
+    @Test
+    public void testRequestFlux() throws Exception {
+        DemoService service = new DemoServiceImpl();
+        protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName())));
+        service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+
+        {
+            Flux<String> result = service.requestFlux("mike");
+            result.doOnNext(new Consumer<String>() {
+                @Override
+                public void accept(String s) {
+                    System.out.println(s);
+                }
+            }).blockLast();
+        }
+
+
+        {
+            Flux<String> result2 = service.requestFluxOnError("mike");
+            result2.onErrorResume(new Function<Throwable, Publisher<? extends String>>() {
+                @Override
+                public Publisher<? extends String> apply(Throwable throwable) {
+                    return Flux.just(throwable.getClass().getName());
+                }
+            }).takeLast(1).doOnNext(new Consumer<String>() {
+                @Override
+                public void accept(String s) {
+                    assertEquals(DemoException.class.getName(), s);
+                }
+            }).blockLast();
+        }
+
+        {
+            Flux<String> result3 = service.requestFluxBizError("mike");
+            result3.onErrorResume(new Function<Throwable, Publisher<? extends String>>() {
+                @Override
+                public Publisher<? extends String> apply(Throwable throwable) {
+                    return Flux.just(throwable.getClass().getName());
+                }
+            }).takeLast(1).doOnNext(new Consumer<String>() {
+                @Override
+                public void accept(String s) {
+                    assertEquals(ArithmeticException.class.getName(), s);
+                }
+            }).blockLast();
+        }
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java
new file mode 100644
index 0000000..33f3a2e
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+public class DemoException extends Exception {
+
+    private static final long serialVersionUID = -8213943026163641747L;
+
+    public DemoException() {
+        super();
+    }
+
+    public DemoException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DemoException(String message) {
+        super(message);
+    }
+
+    public DemoException(Throwable cause) {
+        super(cause);
+    }
+
+}
+
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java
new file mode 100644
index 0000000..b2b37b4
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface DemoService {
+    String sayHello(String name);
+
+    Set<String> keys(Map<String, String> map);
+
+    String echo(String text);
+
+    Map echo(Map map);
+
+    long timestamp();
+
+    String getThreadName();
+
+    int getSize(String[] strs);
+
+    int getSize(Object[] os);
+
+    Object invoke(String service, String method) throws Exception;
+
+    int stringLength(String str);
+
+    byte getbyte(byte arg);
+
+    long add(int a, long b);
+
+    String errorTest(String name);
+
+    Mono<String> requestMono(String name);
+
+    Mono<String> requestMonoOnError(String name);
+
+    Mono<String> requestMonoBizError(String name);
+
+    Flux<String> requestFlux(String name);
+
+    Flux<String> requestFluxOnError(String name);
+
+    Flux<String> requestFluxBizError(String name);
+
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java
new file mode 100644
index 0000000..b67e3e0
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import org.apache.dubbo.rpc.RpcContext;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class DemoServiceImpl implements DemoService {
+    public DemoServiceImpl() {
+        super();
+    }
+
+    public String sayHello(String name) {
+        return "hello " + name;
+    }
+
+    public String echo(String text) {
+        return text;
+    }
+
+    public Map echo(Map map) {
+        return map;
+    }
+
+    public long timestamp() {
+        return System.currentTimeMillis();
+    }
+
+    public String getThreadName() {
+        return Thread.currentThread().getName();
+    }
+
+    public int getSize(String[] strs) {
+        if (strs == null)
+            return -1;
+        return strs.length;
+    }
+
+    public int getSize(Object[] os) {
+        if (os == null)
+            return -1;
+        return os.length;
+    }
+
+    public Object invoke(String service, String method) throws Exception {
+        System.out.println("RpcContext.getContext().getRemoteHost()=" + RpcContext.getContext().getRemoteHost());
+        return service + ":" + method;
+    }
+
+    public int stringLength(String str) {
+        return str.length();
+    }
+
+
+    public byte getbyte(byte arg) {
+        return arg;
+    }
+
+
+    public Set<String> keys(Map<String, String> map) {
+        return map == null ? null : map.keySet();
+    }
+
+
+    public long add(int a, long b) {
+        return a + b;
+    }
+
+    @Override
+    public String errorTest(String name) {
+        int a = 1 / 0;
+        return null;
+    }
+
+    public Mono<String> requestMono(String name) {
+        return Mono.just("hello " + name);
+    }
+
+    public Mono<String> requestMonoOnError(String name) {
+        return Mono.error(new DemoException(name));
+    }
+
+    public Mono<String> requestMonoBizError(String name) {
+        int a = 1 / 0;
+        return Mono.just("hello " + name);
+    }
+
+    @Override
+    public Flux<String> requestFlux(String name) {
+
+        return Flux.create(new Consumer<FluxSink<String>>() {
+            @Override
+            public void accept(FluxSink<String> fluxSink) {
+                for (int i = 0; i < 5; i++) {
+                    fluxSink.next(name + " " + i);
+                }
+                fluxSink.complete();
+            }
+        });
+
+    }
+
+    @Override
+    public Flux<String> requestFluxOnError(String name) {
+
+        return Flux.create(new Consumer<FluxSink<String>>() {
+            @Override
+            public void accept(FluxSink<String> fluxSink) {
+                for (int i = 0; i < 5; i++) {
+                    fluxSink.next(name + " " + i);
+                }
+                fluxSink.error(new DemoException());
+            }
+        });
+
+    }
+
+    @Override
+    public Flux<String> requestFluxBizError(String name) {
+        int a = 1 / 0;
+        return Flux.create(new Consumer<FluxSink<String>>() {
+            @Override
+            public void accept(FluxSink<String> fluxSink) {
+                for (int i = 0; i < 5; i++) {
+                    fluxSink.next(name + " " + i);
+                }
+                fluxSink.error(new DemoException());
+            }
+        });
+    }
+
+}
+
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java
new file mode 100644
index 0000000..d3e21dc
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import java.rmi.RemoteException;
+
+public interface RemoteService {
+    String sayHello(String name) throws RemoteException;
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java
new file mode 100644
index 0000000..afb5788
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import java.rmi.RemoteException;
+
+public class RemoteServiceImpl implements RemoteService {
+    @Override
+    public String sayHello(String name) throws RemoteException {
+        return "hello " + name;
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml
new file mode 100644
index 0000000..3c5d2ba
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <!-- ===================================================================== -->
+    <!-- 以下是appender的定义 -->
+    <!-- ===================================================================== -->
+    <appender name="dubbo" class="org.apache.dubbo.common.utils.DubboAppender">
+        <param name="encoding" value="GBK"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %p [%c:%M] - %m%n"/>
+        </layout>
+        <!-- <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="LevelMin" value="DEBUG" />
+            <param name="LevelMax" value="DEBUG" />
+        </filter> -->
+    </appender>
+    <appender name="FILE" class="org.apache.log4j.FileAppender">
+        <param name="File" value="dubbo.log"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <!-- <param name="ConversionPattern" value="[%t %d{dd/MM/yy HH:mm:ss:SSS
+                z}] %5p %c{2}: %L %m%n" /> -->
+            <param name="ConversionPattern" value="[%t %l %d{dd/MM/yy HH:mm:ss:SSS z}] %5p %m %n"/>
+        </layout>
+    </appender>
+    <root>
+        <level value="INFO"/>
+        <appender-ref ref="dubbo"/>
+        <appender-ref ref="FILE"/>
+    </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml
new file mode 100644
index 0000000..f0f25cf
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
+       xmlns="http://www.springframework.org/schema/beans"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
+
+    <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),
+    don't set it same as provider -->
+    <dubbo:application name="demo-consumer"/>
+
+    <!-- use multicast registry center to discover service -->
+    <dubbo:registry address="multicast://224.5.6.7:1234"/>
+
+    <!-- generate proxy for the remote service, then demoService can be used in the same way as the
+    local regular interface -->
+    <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.rpc.service.DemoService"/>
+
+</beans>
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml
new file mode 100644
index 0000000..e84fb70
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
+       xmlns="http://www.springframework.org/schema/beans"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
+
+    <!-- provider's application name, used for tracing dependency relationship -->
+    <dubbo:application name="demo-provider"/>
+
+    <!-- use multicast registry center to export service -->
+    <dubbo:registry address="multicast://224.5.6.7:1234"/>
+
+    <!-- use rsocket protocol to export service on port 20880 -->
+    <dubbo:protocol name="rsocket" port="20890"/>
+
+    <!-- service implementation, as same as regular local bean -->
+    <bean id="demoService" class="org.apache.dubbo.rpc.service.DemoServiceImpl"/>
+
+    <!-- declare the service interface to be exported -->
+    <dubbo:service interface="org.apache.dubbo.rpc.service.DemoService" ref="demoService"/>
+
+</beans>
\ No newline at end of file
diff --git a/dubbo-rpc/pom.xml b/dubbo-rpc/pom.xml
index 9a2cb64..184bc30 100644
--- a/dubbo-rpc/pom.xml
+++ b/dubbo-rpc/pom.xml
@@ -40,5 +40,6 @@
         <module>dubbo-rpc-memcached</module>
         <module>dubbo-rpc-redis</module>
         <module>dubbo-rpc-rest</module>
+        <module>dubbo-rpc-rsocket</module>
     </modules>
 </project>