You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/03/07 10:13:01 UTC
[incubator-dubbo] branch 3.x-dev updated: Merge pull request #3609,
introduce rx support.
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.x-dev
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/3.x-dev by this push:
new 020697b Merge pull request #3609, introduce rx support.
020697b is described below
commit 020697b0a0eb19dc0cafeb84012a4c25b17c85e6
Author: uglycow <xi...@gmail.com>
AuthorDate: Thu Mar 7 18:12:55 2019 +0800
Merge pull request #3609, introduce rx support.
---
dubbo-rpc/dubbo-rpc-rsocket/pom.xml | 123 +++++
.../rpc/protocol/rsocket/FutureSubscriber.java | 97 ++++
.../dubbo/rpc/protocol/rsocket/MetadataCodec.java | 39 ++
.../rpc/protocol/rsocket/RSocketConstants.java | 35 ++
.../rpc/protocol/rsocket/RSocketExporter.java | 46 ++
.../dubbo/rpc/protocol/rsocket/RSocketInvoker.java | 251 ++++++++++
.../rpc/protocol/rsocket/RSocketProtocol.java | 533 +++++++++++++++++++++
.../dubbo/internal/org.apache.dubbo.rpc.Protocol | 1 +
.../dubbo/rpc/protocol/rsocket/ConsumerDemo.java | 51 ++
.../dubbo/rpc/protocol/rsocket/ProviderDemo.java | 29 ++
.../rpc/protocol/rsocket/RSocketProtocolTest.java | 219 +++++++++
.../apache/dubbo/rpc/service/DemoException.java | 40 ++
.../org/apache/dubbo/rpc/service/DemoService.java | 65 +++
.../apache/dubbo/rpc/service/DemoServiceImpl.java | 153 ++++++
.../apache/dubbo/rpc/service/RemoteService.java | 23 +
.../dubbo/rpc/service/RemoteServiceImpl.java | 26 +
.../dubbo-rpc-rsocket/src/test/resources/log4j.xml | 46 ++
.../resources/spring/dubbo-rsocket-consumer.xml | 36 ++
.../resources/spring/dubbo-rsocket-provider.xml | 40 ++
dubbo-rpc/pom.xml | 1 +
20 files changed, 1854 insertions(+)
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/pom.xml b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml
new file mode 100644
index 0000000..b38f74d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml
@@ -0,0 +1,123 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-rpc</artifactId>
+ <version>2.7.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>dubbo-rpc-rsocket</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The default rpc module of dubbo project</description>
+ <properties>
+ <skip_maven_deploy>false</skip_maven_deploy>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>4.3.16.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-multicast</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.rsocket</groupId>
+ <artifactId>rsocket-core</artifactId>
+ <version>0.11.14</version>
+ </dependency>
+ <dependency>
+ <groupId>io.rsocket</groupId>
+ <artifactId>rsocket-transport-netty</artifactId>
+ <version>0.11.14</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.54</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-rpc-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-config-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-config-spring</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-container-api</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-serialization-hessian2</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-serialization-jdk</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--<dependency>-->
+ <!--<groupId>javax.validation</groupId>-->
+ <!--<artifactId>validation-api</artifactId>-->
+ <!--<scope>test</scope>-->
+ <!--</dependency>-->
+ <!--<dependency>-->
+ <!--<groupId>org.hibernate</groupId>-->
+ <!--<artifactId>hibernate-validator</artifactId>-->
+ <!--<scope>test</scope>-->
+ <!--</dependency>-->
+ <!--<dependency>-->
+ <!--<groupId>org.glassfish</groupId>-->
+ <!--<artifactId>javax.el</artifactId>-->
+ <!--<scope>test</scope>-->
+ <!--</dependency>-->
+ </dependencies>
+</project>
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java
new file mode 100644
index 0000000..8f12ba8
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.Payload;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.rpc.RpcResult;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class FutureSubscriber extends CompletableFuture<RpcResult> implements Subscriber<Payload> {
+
+ private final Serialization serialization;
+
+ private final Class<?> retType;
+
+ public FutureSubscriber(Serialization serialization, Class<?> retType) {
+ this.serialization = serialization;
+ this.retType = retType;
+ }
+
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(Payload payload) {
+ try {
+ RpcResult rpcResult = new RpcResult();
+ ByteBuffer dataBuffer = payload.getData();
+ byte[] dataBytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+ InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+ ObjectInput in = serialization.deserialize(null, dataInputStream);
+
+ int flag = in.readByte();
+ if ((flag & RSocketConstants.FLAG_ERROR) != 0) {
+ Throwable t = (Throwable) in.readObject();
+ rpcResult.setException(t);
+ } else {
+ Object value = null;
+ if ((flag & RSocketConstants.FLAG_NULL_VALUE) == 0) {
+ if (retType == null) {
+ value = in.readObject();
+ } else {
+ value = in.readObject(retType);
+ }
+ rpcResult.setValue(value);
+ }
+ }
+
+ if ((flag & RSocketConstants.FLAG_HAS_ATTACHMENT) != 0) {
+ Map<String, String> attachment = in.readObject(Map.class);
+ rpcResult.setAttachments(attachment);
+
+ }
+
+ this.complete(rpcResult);
+
+
+ } catch (Throwable t) {
+ this.completeExceptionally(t);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ this.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java
new file mode 100644
index 0000000..ce2e1b5
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import com.alibaba.fastjson.JSON;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * @author sixie.xyn on 2019/1/3.
+ */
+public class MetadataCodec {
+
+ public static Map<String, Object> decodeMetadata(byte[] bytes) throws IOException {
+ return JSON.parseObject(new String(bytes, StandardCharsets.UTF_8), Map.class);
+ }
+
+ public static byte[] encodeMetadata(Map<String, Object> metadata) throws IOException {
+ String jsonStr = JSON.toJSONString(metadata);
+ return jsonStr.getBytes(StandardCharsets.UTF_8);
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java
new file mode 100644
index 0000000..e6ad98a
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+/**
+ * @author sixie.xyn on 2019/1/3.
+ */
+public class RSocketConstants {
+
+ public static final String SERVICE_NAME_KEY = "_service_name";
+ public static final String SERVICE_VERSION_KEY = "_service_version";
+ public static final String METHOD_NAME_KEY = "_method_name";
+ public static final String PARAM_TYPE_KEY = "_param_type";
+ public static final String SERIALIZE_TYPE_KEY = "_serialize_type";
+ public static final String TIMEOUT_KEY = "_timeout";
+
+
+ public static final int FLAG_ERROR = 0x01;
+ public static final int FLAG_NULL_VALUE = 0x02;
+ public static final int FLAG_HAS_ATTACHMENT = 0x04;
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java
new file mode 100644
index 0000000..074085e
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.protocol.AbstractExporter;
+
+import java.util.Map;
+
+/**
+ * @author sixie.xyn on 2019/1/2.
+ */
+public class RSocketExporter<T> extends AbstractExporter<T> {
+
+ private final String key;
+
+ private final Map<String, Exporter<?>> exporterMap;
+
+ public RSocketExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
+ super(invoker);
+ this.key = key;
+ this.exporterMap = exporterMap;
+ }
+
+ @Override
+ public void unexport() {
+ super.unexport();
+ exporterMap.remove(key);
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java
new file mode 100644
index 0000000..bc5cf43
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.util.DefaultPayload;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.serialize.Cleanable;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.utils.AtomicPositiveInteger;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.RpcResult;
+import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+import org.apache.dubbo.rpc.support.RpcUtils;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+/**
+ * @author sixie.xyn on 2019/1/2.
+ */
+public class RSocketInvoker<T> extends AbstractInvoker<T> {
+
+ private final RSocket[] clients;
+
+ private final AtomicPositiveInteger index = new AtomicPositiveInteger();
+
+ private final String version;
+
+ private final ReentrantLock destroyLock = new ReentrantLock();
+
+ private final Set<Invoker<?>> invokers;
+
+ private final Serialization serialization;
+
+ public RSocketInvoker(Class<T> serviceType, URL url, RSocket[] clients, Set<Invoker<?>> invokers) {
+ super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
+ this.clients = clients;
+ // get version.
+ this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
+ this.invokers = invokers;
+
+ this.serialization = CodecSupport.getSerialization(getUrl());
+ }
+
+ @Override
+ protected Result doInvoke(final Invocation invocation) throws Throwable {
+ RpcInvocation inv = (RpcInvocation) invocation;
+ final String methodName = RpcUtils.getMethodName(invocation);
+ inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
+ inv.setAttachment(Constants.VERSION_KEY, version);
+
+ RSocket currentClient;
+ if (clients.length == 1) {
+ currentClient = clients[0];
+ } else {
+ currentClient = clients[index.getAndIncrement() % clients.length];
+ }
+ try {
+ //TODO support timeout
+ int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
+
+ RpcContext.getContext().setFuture(null);
+ //encode inv: metadata and data(arg,attachment)
+ Payload requestPayload = encodeInvocation(invocation);
+
+ Class<?> retType = RpcUtils.getReturnType(invocation);
+
+ if (retType != null && retType.isAssignableFrom(Mono.class)) {
+ Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
+ Mono<Object> bizMono = responseMono.map(new Function<Payload, Object>() {
+ @Override
+ public Object apply(Payload payload) {
+ return decodeData(payload);
+ }
+ });
+ RpcResult rpcResult = new RpcResult();
+ rpcResult.setValue(bizMono);
+ return rpcResult;
+ } else if (retType != null && retType.isAssignableFrom(Flux.class)) {
+ return requestStream(currentClient, requestPayload);
+ } else {
+ //request-reponse
+ Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
+ FutureSubscriber futureSubscriber = new FutureSubscriber(serialization, retType);
+ responseMono.subscribe(futureSubscriber);
+ return (Result) futureSubscriber.get();
+ }
+
+ //TODO support stream arg
+ } catch (Throwable t) {
+ throw new RpcException(t);
+ }
+ }
+
+
+ private Result requestStream(RSocket currentClient, Payload requestPayload) {
+ Flux<Payload> responseFlux = currentClient.requestStream(requestPayload);
+ Flux<Object> retFlux = responseFlux.map(new Function<Payload, Object>() {
+
+ @Override
+ public Object apply(Payload payload) {
+ return decodeData(payload);
+ }
+ });
+
+ RpcResult rpcResult = new RpcResult();
+ rpcResult.setValue(retFlux);
+ return rpcResult;
+ }
+
+
+ private Object decodeData(Payload payload) {
+ try {
+ //TODO save the copy
+ ByteBuffer dataBuffer = payload.getData();
+ byte[] dataBytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+ InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+ ObjectInput in = serialization.deserialize(null, dataInputStream);
+ int flag = in.readByte();
+ if ((flag & RSocketConstants.FLAG_ERROR) != 0) {
+ Throwable t = (Throwable) in.readObject();
+ throw t;
+ } else {
+ return in.readObject();
+ }
+ } catch (Throwable t) {
+ throw Exceptions.propagate(t);
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (!super.isAvailable()) {
+ return false;
+ }
+ for (RSocket client : clients) {
+ if (client.availability() > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void destroy() {
+ // in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every
+ // time when client.close() is called, counter counts down once, and when counter reaches zero, client will be
+ // closed.
+ if (super.isDestroyed()) {
+ return;
+ } else {
+ // double check to avoid dup close
+ destroyLock.lock();
+ try {
+ if (super.isDestroyed()) {
+ return;
+ }
+ super.destroy();
+ if (invokers != null) {
+ invokers.remove(this);
+ }
+ for (RSocket client : clients) {
+ try {
+ client.dispose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+
+ } finally {
+ destroyLock.unlock();
+ }
+ }
+ }
+
+ private Payload encodeInvocation(Invocation invocation) throws IOException {
+ byte[] metadata = encodeMetadata(invocation);
+ byte[] data = encodeData(invocation);
+ return DefaultPayload.create(data, metadata);
+ }
+
+ private byte[] encodeMetadata(Invocation invocation) throws IOException {
+ Map<String, Object> metadataMap = new HashMap<String, Object>();
+ metadataMap.put(RSocketConstants.SERVICE_NAME_KEY, invocation.getAttachment(Constants.PATH_KEY));
+ metadataMap.put(RSocketConstants.SERVICE_VERSION_KEY, invocation.getAttachment(Constants.VERSION_KEY));
+ metadataMap.put(RSocketConstants.METHOD_NAME_KEY, invocation.getMethodName());
+ metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(invocation.getParameterTypes()));
+ metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, (Byte) serialization.getContentTypeId());
+ return MetadataCodec.encodeMetadata(metadataMap);
+ }
+
+
+ private byte[] encodeData(Invocation invocation) throws IOException {
+ ByteArrayOutputStream dataOutputStream = new ByteArrayOutputStream();
+ Serialization serialization = CodecSupport.getSerialization(getUrl());
+ ObjectOutput out = serialization.serialize(getUrl(), dataOutputStream);
+ RpcInvocation inv = (RpcInvocation) invocation;
+ Object[] args = inv.getArguments();
+ if (args != null) {
+ for (int i = 0; i < args.length; i++) {
+ out.writeObject(args[i]);
+ }
+ }
+ out.writeObject(RpcUtils.getNecessaryAttachments(inv));
+
+ //clean
+ out.flushBuffer();
+ if (out instanceof Cleanable) {
+ ((Cleanable) out).cleanup();
+ }
+ return dataOutputStream.toByteArray();
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java
new file mode 100644
index 0000000..6d480d0
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.AbstractRSocket;
+import io.rsocket.ConnectionSetupPayload;
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.RSocketFactory;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.transport.netty.server.CloseableChannel;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.DefaultPayload;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.protocol.AbstractProtocol;
+import org.apache.dubbo.rpc.support.RpcUtils;
+import org.reactivestreams.Publisher;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+/**
+ * @author sixie.xyn on 2019/1/2.
+ */
+public class RSocketProtocol extends AbstractProtocol {
+
+ public static final String NAME = "rsocket";
+ public static final int DEFAULT_PORT = 30880;
+ private static final Logger log = LoggerFactory.getLogger(RSocketProtocol.class);
+ private static RSocketProtocol INSTANCE;
+
+ // <host:port,CloseableChannel>
+ private final Map<String, CloseableChannel> serverMap = new ConcurrentHashMap<String, CloseableChannel>();
+
+ // <host:port,RSocket>
+ private final Map<String, RSocket> referenceClientMap = new ConcurrentHashMap<String, RSocket>();
+
+ private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
+
+ public RSocketProtocol() {
+ INSTANCE = this;
+ }
+
+ public static RSocketProtocol getRSocketProtocol() {
+ if (INSTANCE == null) {
+ ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(RSocketProtocol.NAME); // load
+ }
+ return INSTANCE;
+ }
+
+ public Collection<Exporter<?>> getExporters() {
+ return Collections.unmodifiableCollection(exporterMap.values());
+ }
+
+ Map<String, Exporter<?>> getExporterMap() {
+ return exporterMap;
+ }
+
+ Invoker<?> getInvoker(int port, Map<String, Object> metadataMap) throws RemotingException {
+ String path = (String) metadataMap.get(RSocketConstants.SERVICE_NAME_KEY);
+ String serviceKey = serviceKey(port, path, (String) metadataMap.get(RSocketConstants.SERVICE_VERSION_KEY), (String) metadataMap.get(Constants.GROUP_KEY));
+ RSocketExporter<?> exporter = (RSocketExporter<?>) exporterMap.get(serviceKey);
+ if (exporter == null) {
+ //throw new Throwable("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
+ throw new RuntimeException("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch ");
+ }
+
+ return exporter.getInvoker();
+ }
+
+ public Collection<Invoker<?>> getInvokers() {
+ return Collections.unmodifiableCollection(invokers);
+ }
+
+ @Override
+ public int getDefaultPort() {
+ return DEFAULT_PORT;
+ }
+
+ @Override
+ public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
+ URL url = invoker.getUrl();
+
+ // export service.
+ String key = serviceKey(url);
+ RSocketExporter<T> exporter = new RSocketExporter<T>(invoker, key, exporterMap);
+ exporterMap.put(key, exporter);
+
+ openServer(url);
+ return exporter;
+ }
+
+ private void openServer(URL url) {
+ String key = url.getAddress();
+ //client can export a service which's only for server to invoke
+ boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
+ if (isServer) {
+ CloseableChannel server = serverMap.get(key);
+ if (server == null) {
+ synchronized (this) {
+ server = serverMap.get(key);
+ if (server == null) {
+ serverMap.put(key, createServer(url));
+ }
+ }
+ }
+ }
+ }
+
+ private CloseableChannel createServer(URL url) {
+ try {
+ String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
+ int bindPort = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
+ if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
+ bindIp = NetUtils.ANYHOST;
+ }
+ return RSocketFactory.receive()
+ .acceptor(new SocketAcceptorImpl(bindPort))
+ .transport(TcpServerTransport.create(bindIp, bindPort))
+ .start()
+ .block();
+ } catch (Throwable e) {
+ throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
+ }
+ }
+
+
+ @Override
+ public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
+ // create rpc invoker.
+ RSocketInvoker<T> invoker = new RSocketInvoker<T>(serviceType, url, getClients(url), invokers);
+ invokers.add(invoker);
+ return invoker;
+ }
+
+ private RSocket[] getClients(URL url) {
+ // whether to share connection
+ boolean service_share_connect = false;
+ int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
+ // if not configured, connection is shared, otherwise, one connection for one service
+ if (connections == 0) {
+ service_share_connect = true;
+ connections = 1;
+ }
+
+ RSocket[] clients = new RSocket[connections];
+ for (int i = 0; i < clients.length; i++) {
+ if (service_share_connect) {
+ clients[i] = getSharedClient(url);
+ } else {
+ clients[i] = initClient(url);
+ }
+ }
+ return clients;
+ }
+
+ /**
+ * Get shared connection
+ */
+ private RSocket getSharedClient(URL url) {
+ String key = url.getAddress();
+ RSocket client = referenceClientMap.get(key);
+ if (client != null) {
+ return client;
+ }
+
+ locks.putIfAbsent(key, new Object());
+ synchronized (locks.get(key)) {
+ if (referenceClientMap.containsKey(key)) {
+ return referenceClientMap.get(key);
+ }
+
+ client = initClient(url);
+ referenceClientMap.put(key, client);
+ locks.remove(key);
+ return client;
+ }
+ }
+
+ /**
+ * Create new connection
+ */
+ private RSocket initClient(URL url) {
+ try {
+ InetSocketAddress serverAddress = new InetSocketAddress(NetUtils.filterLocalHost(url.getHost()), url.getPort());
+ RSocket client = RSocketFactory.connect().keepAliveTickPeriod(Duration.ZERO).keepAliveAckTimeout(Duration.ZERO).acceptor(
+ rSocket ->
+ new AbstractRSocket() {
+ public Mono<Payload> requestResponse(Payload payload) {
+ //TODO support Mono arg
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Flux<Payload> requestStream(Payload payload) {
+ //TODO support Flux arg
+ throw new UnsupportedOperationException();
+ }
+ })
+ .transport(TcpClientTransport.create(serverAddress))
+ .start()
+ .block();
+ return client;
+ } catch (Throwable e) {
+ throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void destroy() {
+ for (String key : new ArrayList<String>(serverMap.keySet())) {
+ CloseableChannel server = serverMap.remove(key);
+ if (server != null) {
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Close dubbo server: " + server.address());
+ }
+ server.dispose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+ }
+
+ for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
+ RSocket client = referenceClientMap.remove(key);
+ if (client != null) {
+ try {
+// if (logger.isInfoEnabled()) {
+// logger.info("Close dubbo connect: " + client. + "-->" + client.getRemoteAddress());
+// }
+ client.dispose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+ }
+ super.destroy();
+ }
+
+
+ //server process logic
+ private class SocketAcceptorImpl implements SocketAcceptor {
+
+ private final int port;
+
+ public SocketAcceptorImpl(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
+ return Mono.just(
+ new AbstractRSocket() {
+ public Mono<Payload> requestResponse(Payload payload) {
+ try {
+ Map<String, Object> metadata = decodeMetadata(payload);
+ Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
+ Invocation inv = decodeInvocation(payload, metadata, serializeId);
+
+ Result result = inv.getInvoker().invoke(inv);
+
+ Class<?> retType = RpcUtils.getReturnType(inv);
+ //ok
+ if (retType != null && Mono.class.isAssignableFrom(retType)) {
+ Throwable th = result.getException();
+ if (th == null) {
+ Mono bizMono = (Mono) result.getValue();
+ Mono<Payload> retMono = bizMono.map(new Function<Object, Payload>() {
+ @Override
+ public Payload apply(Object o) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) 0);
+ out.writeObject(o);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ Payload responsePayload = DefaultPayload.create(bos.toByteArray());
+ return responsePayload;
+ } catch (Throwable t) {
+ throw Exceptions.propagate(t);
+ }
+ }
+ }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
+ @Override
+ public Publisher<Payload> apply(Throwable throwable) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+ out.writeObject(throwable);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ Payload errorPayload = DefaultPayload.create(bos.toByteArray());
+ return Flux.just(errorPayload);
+ } catch (Throwable t) {
+ throw Exceptions.propagate(t);
+ }
+ }
+ });
+
+ return retMono;
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+ out.writeObject(th);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ Payload errorPayload = DefaultPayload.create(bos.toByteArray());
+ return Mono.just(errorPayload);
+ }
+
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ int flag = RSocketConstants.FLAG_HAS_ATTACHMENT;
+
+ Throwable th = result.getException();
+ if (th == null) {
+ Object ret = result.getValue();
+ if (ret == null) {
+ flag |= RSocketConstants.FLAG_NULL_VALUE;
+ out.writeByte((byte) flag);
+ } else {
+ out.writeByte((byte) flag);
+ out.writeObject(ret);
+ }
+ } else {
+ flag |= RSocketConstants.FLAG_ERROR;
+ out.writeByte((byte) flag);
+ out.writeObject(th);
+ }
+ out.writeObject(result.getAttachments());
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+
+ Payload responsePayload = DefaultPayload.create(bos.toByteArray());
+ return Mono.just(responsePayload);
+ }
+ } catch (Throwable t) {
+ //application error
+ return Mono.error(t);
+ } finally {
+ payload.release();
+ }
+ }
+
+ public Flux<Payload> requestStream(Payload payload) {
+ try {
+ Map<String, Object> metadata = decodeMetadata(payload);
+ Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
+ Invocation inv = decodeInvocation(payload, metadata, serializeId);
+
+ Result result = inv.getInvoker().invoke(inv);
+ //Class<?> retType = RpcUtils.getReturnType(inv);
+
+ Throwable th = result.getException();
+ if (th != null) {
+ Payload errorPayload = encodeError(th, serializeId);
+ return Flux.just(errorPayload);
+ }
+
+ Flux flux = (Flux) result.getValue();
+ Flux<Payload> retFlux = flux.map(new Function<Object, Payload>() {
+ @Override
+ public Payload apply(Object o) {
+ try {
+ return encodeData(o, serializeId);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
+ @Override
+ public Publisher<Payload> apply(Throwable throwable) {
+ try {
+ Payload errorPayload = encodeError(throwable, serializeId);
+ return Flux.just(errorPayload);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ });
+ return retFlux;
+ } catch (Throwable t) {
+ return Flux.error(t);
+ } finally {
+ payload.release();
+ }
+ }
+
+ private Payload encodeData(Object data, byte serializeId) throws Throwable {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) 0);
+ out.writeObject(data);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ return DefaultPayload.create(bos.toByteArray());
+ }
+
+ private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+ out.writeObject(throwable);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ return DefaultPayload.create(bos.toByteArray());
+ }
+
+ private Map<String, Object> decodeMetadata(Payload payload) throws IOException {
+ ByteBuffer metadataBuffer = payload.getMetadata();
+ byte[] metadataBytes = new byte[metadataBuffer.remaining()];
+ metadataBuffer.get(metadataBytes, metadataBuffer.position(), metadataBuffer.remaining());
+ return MetadataCodec.decodeMetadata(metadataBytes);
+ }
+
+ private Invocation decodeInvocation(Payload payload, Map<String, Object> metadata, Byte serializeId) throws RemotingException, IOException, ClassNotFoundException {
+ Invoker<?> invoker = getInvoker(port, metadata);
+
+ String serviceName = (String) metadata.get(RSocketConstants.SERVICE_NAME_KEY);
+ String version = (String) metadata.get(RSocketConstants.SERVICE_VERSION_KEY);
+ String methodName = (String) metadata.get(RSocketConstants.METHOD_NAME_KEY);
+ String paramType = (String) metadata.get(RSocketConstants.PARAM_TYPE_KEY);
+
+ ByteBuffer dataBuffer = payload.getData();
+ byte[] dataBytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+
+
+ //TODO how to get remote address
+ //RpcContext rpcContext = RpcContext.getContext();
+ //rpcContext.setRemoteAddress(channel.getRemoteAddress());
+
+
+ RpcInvocation inv = new RpcInvocation();
+ inv.setInvoker(invoker);
+ inv.setAttachment(Constants.PATH_KEY, serviceName);
+ inv.setAttachment(Constants.VERSION_KEY, version);
+ inv.setMethodName(methodName);
+
+
+ InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+ ObjectInput in = CodecSupport.getSerializationById(serializeId).deserialize(null, dataInputStream);
+
+ Object[] args;
+ Class<?>[] pts;
+ String desc = paramType;
+ if (desc.length() == 0) {
+ pts = new Class<?>[0];
+ args = new Object[0];
+ } else {
+ pts = ReflectUtils.desc2classArray(desc);
+ args = new Object[pts.length];
+ for (int i = 0; i < args.length; i++) {
+ try {
+ args[i] = in.readObject(pts[i]);
+ } catch (Exception e) {
+ if (log.isWarnEnabled()) {
+ log.warn("Decode argument failed: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ inv.setParameterTypes(pts);
+ inv.setArguments(args);
+ Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
+ if (map != null && map.size() > 0) {
+ inv.addAttachments(map);
+ }
+ return inv;
+ }
+ });
+ }
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol b/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
new file mode 100644
index 0000000..4f03810
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
@@ -0,0 +1 @@
+rsocket=org.apache.dubbo.rpc.protocol.rsocket.RSocketProtocol
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java
new file mode 100644
index 0000000..d73bbae
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.apache.dubbo.rpc.service.DemoService;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Consumer;
+
+public class ConsumerDemo {
+
+ public static void main(String[] args) {
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-rsocket-consumer.xml"});
+ context.start();
+ DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
+
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ Mono<String> resultMono = demoService.requestMono("world"); // call remote method
+ resultMono.doOnNext(new Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ System.out.println(s); // get result
+ }
+ }).block();
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+
+ }
+
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java
new file mode 100644
index 0000000..2e7466d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class ProviderDemo {
+
+ public static void main(String[] args) throws Exception {
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-rsocket-provider.xml"});
+ context.start();
+ System.in.read(); // press any key to exit
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java
new file mode 100644
index 0000000..e34a6f7
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.ProxyFactory;
+import org.apache.dubbo.rpc.service.DemoException;
+import org.apache.dubbo.rpc.service.DemoService;
+import org.apache.dubbo.rpc.service.DemoServiceImpl;
+import org.apache.dubbo.rpc.service.EchoService;
+import org.apache.dubbo.rpc.service.RemoteService;
+import org.apache.dubbo.rpc.service.RemoteServiceImpl;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+public class RSocketProtocolTest {
+
+ private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
+ private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
+
+ @AfterClass
+ public static void after() {
+ RSocketProtocol.getRSocketProtocol().destroy();
+ }
+
+ @Test
+ public void testDemoProtocol() throws Exception {
+ DemoService service = new DemoServiceImpl();
+ protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName())));
+ service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+ assertEquals(service.getSize(new String[]{"", "", ""}), 3);
+ }
+
+ @Test
+ public void testDubboProtocol() throws Exception {
+ DemoService service = new DemoServiceImpl();
+ protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName())));
+ service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+
+ assertEquals(service.getSize(null), -1);
+ assertEquals(service.getSize(new String[]{"", "", ""}), 3);
+
+
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("aa", "bb");
+ Set<String> set = service.keys(map);
+ assertEquals(set.size(), 1);
+ assertEquals(set.iterator().next(), "aa");
+ service.invoke("rsocket://127.0.0.1:9010/" + DemoService.class.getName() + "", "invoke");
+
+ StringBuffer buf = new StringBuffer();
+ for (int i = 0; i < 1024 * 32 + 32; i++)
+ buf.append('A');
+ System.out.println(service.stringLength(buf.toString()));
+
+ // cast to EchoService
+ EchoService echo = proxy.getProxy(protocol.refer(EchoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+ assertEquals(echo.$echo(buf.toString()), buf.toString());
+ assertEquals(echo.$echo("test"), "test");
+ assertEquals(echo.$echo("abcdefg"), "abcdefg");
+ assertEquals(echo.$echo(1234), 1234);
+ }
+
+
+ @Test
+ public void testDubboProtocolThrowable() throws Exception {
+ DemoService service = new DemoServiceImpl();
+ protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName())));
+ service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+ try {
+ service.errorTest("mike");
+ } catch (Throwable t) {
+ assertEquals(t.getClass(), ArithmeticException.class);
+ }
+ }
+
+ @Test
+ public void testDubboProtocolMultiService() throws Exception {
+ DemoService service = new DemoServiceImpl();
+ protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName())));
+ service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+
+ RemoteService remote = new RemoteServiceImpl();
+ protocol.export(proxy.getInvoker(remote, RemoteService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + RemoteService.class.getName())));
+ remote = proxy.getProxy(protocol.refer(RemoteService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + RemoteService.class.getName()).addParameter("timeout", 3000l)));
+
+ service.sayHello("world");
+
+ // test netty client
+ assertEquals("world", service.echo("world"));
+ assertEquals("hello world", remote.sayHello("world"));
+
+ EchoService serviceEcho = (EchoService) service;
+ assertEquals(serviceEcho.$echo("test"), "test");
+
+ EchoService remoteEecho = (EchoService) remote;
+ assertEquals(remoteEecho.$echo("ok"), "ok");
+ }
+
+
+ @Test
+ public void testRequestMono() throws Exception {
+ DemoService service = new DemoServiceImpl();
+ protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName())));
+ service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+ Mono<String> result = service.requestMono("mike");
+
+ result.doOnNext(new Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ assertEquals(s, "hello mike");
+ System.out.println(s);
+ }
+ }).block();
+
+ Mono<String> result2 = service.requestMonoOnError("mike");
+ result2.onErrorResume(DemoException.class, new Function<DemoException, Mono<String>>() {
+ @Override
+ public Mono<String> apply(DemoException e) {
+ return Mono.just(e.getClass().getName());
+ }
+ }).doOnNext(new Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ assertEquals(DemoException.class.getName(), s);
+ }
+ }).block();
+
+ Mono<String> result3 = service.requestMonoBizError("mike");
+ result3.onErrorResume(ArithmeticException.class, new Function<ArithmeticException, Mono<String>>() {
+ @Override
+ public Mono<String> apply(ArithmeticException e) {
+ return Mono.just(e.getClass().getName());
+ }
+ }).doOnNext(new Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ assertEquals(ArithmeticException.class.getName(), s);
+ }
+ }).block();
+
+ }
+
+ @Test
+ public void testRequestFlux() throws Exception {
+ DemoService service = new DemoServiceImpl();
+ protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName())));
+ service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l)));
+
+ {
+ Flux<String> result = service.requestFlux("mike");
+ result.doOnNext(new Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ System.out.println(s);
+ }
+ }).blockLast();
+ }
+
+
+ {
+ Flux<String> result2 = service.requestFluxOnError("mike");
+ result2.onErrorResume(new Function<Throwable, Publisher<? extends String>>() {
+ @Override
+ public Publisher<? extends String> apply(Throwable throwable) {
+ return Flux.just(throwable.getClass().getName());
+ }
+ }).takeLast(1).doOnNext(new Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ assertEquals(DemoException.class.getName(), s);
+ }
+ }).blockLast();
+ }
+
+ {
+ Flux<String> result3 = service.requestFluxBizError("mike");
+ result3.onErrorResume(new Function<Throwable, Publisher<? extends String>>() {
+ @Override
+ public Publisher<? extends String> apply(Throwable throwable) {
+ return Flux.just(throwable.getClass().getName());
+ }
+ }).takeLast(1).doOnNext(new Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ assertEquals(ArithmeticException.class.getName(), s);
+ }
+ }).blockLast();
+ }
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java
new file mode 100644
index 0000000..33f3a2e
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+public class DemoException extends Exception {
+
+ private static final long serialVersionUID = -8213943026163641747L;
+
+ public DemoException() {
+ super();
+ }
+
+ public DemoException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DemoException(String message) {
+ super(message);
+ }
+
+ public DemoException(Throwable cause) {
+ super(cause);
+ }
+
+}
+
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java
new file mode 100644
index 0000000..b2b37b4
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface DemoService {
+ String sayHello(String name);
+
+ Set<String> keys(Map<String, String> map);
+
+ String echo(String text);
+
+ Map echo(Map map);
+
+ long timestamp();
+
+ String getThreadName();
+
+ int getSize(String[] strs);
+
+ int getSize(Object[] os);
+
+ Object invoke(String service, String method) throws Exception;
+
+ int stringLength(String str);
+
+ byte getbyte(byte arg);
+
+ long add(int a, long b);
+
+ String errorTest(String name);
+
+ Mono<String> requestMono(String name);
+
+ Mono<String> requestMonoOnError(String name);
+
+ Mono<String> requestMonoBizError(String name);
+
+ Flux<String> requestFlux(String name);
+
+ Flux<String> requestFluxOnError(String name);
+
+ Flux<String> requestFluxBizError(String name);
+
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java
new file mode 100644
index 0000000..b67e3e0
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import org.apache.dubbo.rpc.RpcContext;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class DemoServiceImpl implements DemoService {
+ public DemoServiceImpl() {
+ super();
+ }
+
+ public String sayHello(String name) {
+ return "hello " + name;
+ }
+
+ public String echo(String text) {
+ return text;
+ }
+
+ public Map echo(Map map) {
+ return map;
+ }
+
+ public long timestamp() {
+ return System.currentTimeMillis();
+ }
+
+ public String getThreadName() {
+ return Thread.currentThread().getName();
+ }
+
+ public int getSize(String[] strs) {
+ if (strs == null)
+ return -1;
+ return strs.length;
+ }
+
+ public int getSize(Object[] os) {
+ if (os == null)
+ return -1;
+ return os.length;
+ }
+
+ public Object invoke(String service, String method) throws Exception {
+ System.out.println("RpcContext.getContext().getRemoteHost()=" + RpcContext.getContext().getRemoteHost());
+ return service + ":" + method;
+ }
+
+ public int stringLength(String str) {
+ return str.length();
+ }
+
+
+ public byte getbyte(byte arg) {
+ return arg;
+ }
+
+
+ public Set<String> keys(Map<String, String> map) {
+ return map == null ? null : map.keySet();
+ }
+
+
+ public long add(int a, long b) {
+ return a + b;
+ }
+
+ @Override
+ public String errorTest(String name) {
+ int a = 1 / 0;
+ return null;
+ }
+
+ public Mono<String> requestMono(String name) {
+ return Mono.just("hello " + name);
+ }
+
+ public Mono<String> requestMonoOnError(String name) {
+ return Mono.error(new DemoException(name));
+ }
+
+ public Mono<String> requestMonoBizError(String name) {
+ int a = 1 / 0;
+ return Mono.just("hello " + name);
+ }
+
+ @Override
+ public Flux<String> requestFlux(String name) {
+
+ return Flux.create(new Consumer<FluxSink<String>>() {
+ @Override
+ public void accept(FluxSink<String> fluxSink) {
+ for (int i = 0; i < 5; i++) {
+ fluxSink.next(name + " " + i);
+ }
+ fluxSink.complete();
+ }
+ });
+
+ }
+
+ @Override
+ public Flux<String> requestFluxOnError(String name) {
+
+ return Flux.create(new Consumer<FluxSink<String>>() {
+ @Override
+ public void accept(FluxSink<String> fluxSink) {
+ for (int i = 0; i < 5; i++) {
+ fluxSink.next(name + " " + i);
+ }
+ fluxSink.error(new DemoException());
+ }
+ });
+
+ }
+
+ @Override
+ public Flux<String> requestFluxBizError(String name) {
+ int a = 1 / 0;
+ return Flux.create(new Consumer<FluxSink<String>>() {
+ @Override
+ public void accept(FluxSink<String> fluxSink) {
+ for (int i = 0; i < 5; i++) {
+ fluxSink.next(name + " " + i);
+ }
+ fluxSink.error(new DemoException());
+ }
+ });
+ }
+
+}
+
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java
new file mode 100644
index 0000000..d3e21dc
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import java.rmi.RemoteException;
+
+public interface RemoteService {
+ String sayHello(String name) throws RemoteException;
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java
new file mode 100644
index 0000000..afb5788
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.service;
+
+import java.rmi.RemoteException;
+
+public class RemoteServiceImpl implements RemoteService {
+ @Override
+ public String sayHello(String name) throws RemoteException {
+ return "hello " + name;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml
new file mode 100644
index 0000000..3c5d2ba
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <!-- ===================================================================== -->
+ <!-- 以下是appender的定义 -->
+ <!-- ===================================================================== -->
+ <appender name="dubbo" class="org.apache.dubbo.common.utils.DubboAppender">
+ <param name="encoding" value="GBK"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %p [%c:%M] - %m%n"/>
+ </layout>
+ <!-- <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="LevelMin" value="DEBUG" />
+ <param name="LevelMax" value="DEBUG" />
+ </filter> -->
+ </appender>
+ <appender name="FILE" class="org.apache.log4j.FileAppender">
+ <param name="File" value="dubbo.log"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- <param name="ConversionPattern" value="[%t %d{dd/MM/yy HH:mm:ss:SSS
+ z}] %5p %c{2}: %L %m%n" /> -->
+ <param name="ConversionPattern" value="[%t %l %d{dd/MM/yy HH:mm:ss:SSS z}] %5p %m %n"/>
+ </layout>
+ </appender>
+ <root>
+ <level value="INFO"/>
+ <appender-ref ref="dubbo"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml
new file mode 100644
index 0000000..f0f25cf
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
+ xmlns="http://www.springframework.org/schema/beans"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
+
+ <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),
+ don't set it same as provider -->
+ <dubbo:application name="demo-consumer"/>
+
+ <!-- use multicast registry center to discover service -->
+ <dubbo:registry address="multicast://224.5.6.7:1234"/>
+
+ <!-- generate proxy for the remote service, then demoService can be used in the same way as the
+ local regular interface -->
+ <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.rpc.service.DemoService"/>
+
+</beans>
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml
new file mode 100644
index 0000000..e84fb70
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
+ xmlns="http://www.springframework.org/schema/beans"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
+
+ <!-- provider's application name, used for tracing dependency relationship -->
+ <dubbo:application name="demo-provider"/>
+
+ <!-- use multicast registry center to export service -->
+ <dubbo:registry address="multicast://224.5.6.7:1234"/>
+
+ <!-- use rsocket protocol to export service on port 20880 -->
+ <dubbo:protocol name="rsocket" port="20890"/>
+
+ <!-- service implementation, as same as regular local bean -->
+ <bean id="demoService" class="org.apache.dubbo.rpc.service.DemoServiceImpl"/>
+
+ <!-- declare the service interface to be exported -->
+ <dubbo:service interface="org.apache.dubbo.rpc.service.DemoService" ref="demoService"/>
+
+</beans>
\ No newline at end of file
diff --git a/dubbo-rpc/pom.xml b/dubbo-rpc/pom.xml
index 9a2cb64..184bc30 100644
--- a/dubbo-rpc/pom.xml
+++ b/dubbo-rpc/pom.xml
@@ -40,5 +40,6 @@
<module>dubbo-rpc-memcached</module>
<module>dubbo-rpc-redis</module>
<module>dubbo-rpc-rest</module>
+ <module>dubbo-rpc-rsocket</module>
</modules>
</project>