You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/03/11 06:43:07 UTC
[incubator-dubbo] branch 3.x-dev updated: Merge pull request #3626,
remove author.
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.x-dev
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/3.x-dev by this push:
new 6272691 Merge pull request #3626, remove author.
6272691 is described below
commit 6272691ca6ef10b3db4f1fb6ac3611d5c125f88f
Author: uglycow <xi...@gmail.com>
AuthorDate: Mon Mar 11 14:43:01 2019 +0800
Merge pull request #3626, remove author.
---
.../dubbo/rpc/protocol/rsocket/MetadataCodec.java | 75 +-
.../rpc/protocol/rsocket/RSocketConstants.java | 67 +-
.../rpc/protocol/rsocket/RSocketExporter.java | 89 +-
.../dubbo/rpc/protocol/rsocket/RSocketInvoker.java | 499 +++++----
.../rpc/protocol/rsocket/RSocketProtocol.java | 1063 ++++++++++----------
5 files changed, 889 insertions(+), 904 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java
index ce2e1b5..8a62597 100644
--- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java
@@ -1,39 +1,36 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.rsocket;
-
-import com.alibaba.fastjson.JSON;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-/**
- * @author sixie.xyn on 2019/1/3.
- */
-public class MetadataCodec {
-
- public static Map<String, Object> decodeMetadata(byte[] bytes) throws IOException {
- return JSON.parseObject(new String(bytes, StandardCharsets.UTF_8), Map.class);
- }
-
- public static byte[] encodeMetadata(Map<String, Object> metadata) throws IOException {
- String jsonStr = JSON.toJSONString(metadata);
- return jsonStr.getBytes(StandardCharsets.UTF_8);
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import com.alibaba.fastjson.JSON;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class MetadataCodec {
+
+ public static Map<String, Object> decodeMetadata(byte[] bytes) throws IOException {
+ return JSON.parseObject(new String(bytes, StandardCharsets.UTF_8), Map.class);
+ }
+
+ public static byte[] encodeMetadata(Map<String, Object> metadata) throws IOException {
+ String jsonStr = JSON.toJSONString(metadata);
+ return jsonStr.getBytes(StandardCharsets.UTF_8);
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java
index e6ad98a..a252dbd 100644
--- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java
@@ -1,35 +1,32 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.rsocket;
-
-/**
- * @author sixie.xyn on 2019/1/3.
- */
-public class RSocketConstants {
-
- public static final String SERVICE_NAME_KEY = "_service_name";
- public static final String SERVICE_VERSION_KEY = "_service_version";
- public static final String METHOD_NAME_KEY = "_method_name";
- public static final String PARAM_TYPE_KEY = "_param_type";
- public static final String SERIALIZE_TYPE_KEY = "_serialize_type";
- public static final String TIMEOUT_KEY = "_timeout";
-
-
- public static final int FLAG_ERROR = 0x01;
- public static final int FLAG_NULL_VALUE = 0x02;
- public static final int FLAG_HAS_ATTACHMENT = 0x04;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+public class RSocketConstants {
+
+ public static final String SERVICE_NAME_KEY = "_service_name";
+ public static final String SERVICE_VERSION_KEY = "_service_version";
+ public static final String METHOD_NAME_KEY = "_method_name";
+ public static final String PARAM_TYPE_KEY = "_param_type";
+ public static final String SERIALIZE_TYPE_KEY = "_serialize_type";
+ public static final String TIMEOUT_KEY = "_timeout";
+
+
+ public static final int FLAG_ERROR = 0x01;
+ public static final int FLAG_NULL_VALUE = 0x02;
+ public static final int FLAG_HAS_ATTACHMENT = 0x04;
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java
index 074085e..3ebaea2 100644
--- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java
@@ -1,46 +1,43 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.rsocket;
-
-import org.apache.dubbo.rpc.Exporter;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.protocol.AbstractExporter;
-
-import java.util.Map;
-
-/**
- * @author sixie.xyn on 2019/1/2.
- */
-public class RSocketExporter<T> extends AbstractExporter<T> {
-
- private final String key;
-
- private final Map<String, Exporter<?>> exporterMap;
-
- public RSocketExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
- super(invoker);
- this.key = key;
- this.exporterMap = exporterMap;
- }
-
- @Override
- public void unexport() {
- super.unexport();
- exporterMap.remove(key);
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.protocol.AbstractExporter;
+
+import java.util.Map;
+
+public class RSocketExporter<T> extends AbstractExporter<T> {
+
+ private final String key;
+
+ private final Map<String, Exporter<?>> exporterMap;
+
+ public RSocketExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
+ super(invoker);
+ this.key = key;
+ this.exporterMap = exporterMap;
+ }
+
+ @Override
+ public void unexport() {
+ super.unexport();
+ exporterMap.remove(key);
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java
index bc5cf43..98c7d87 100644
--- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java
@@ -1,251 +1,248 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.rsocket;
-
-import io.rsocket.Payload;
-import io.rsocket.RSocket;
-import io.rsocket.util.DefaultPayload;
-import org.apache.dubbo.common.Constants;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.serialize.Cleanable;
-import org.apache.dubbo.common.serialize.ObjectInput;
-import org.apache.dubbo.common.serialize.ObjectOutput;
-import org.apache.dubbo.common.serialize.Serialization;
-import org.apache.dubbo.common.utils.AtomicPositiveInteger;
-import org.apache.dubbo.common.utils.ReflectUtils;
-import org.apache.dubbo.remoting.transport.CodecSupport;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.RpcResult;
-import org.apache.dubbo.rpc.protocol.AbstractInvoker;
-import org.apache.dubbo.rpc.support.RpcUtils;
-import reactor.core.Exceptions;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-
-/**
- * @author sixie.xyn on 2019/1/2.
- */
-public class RSocketInvoker<T> extends AbstractInvoker<T> {
-
- private final RSocket[] clients;
-
- private final AtomicPositiveInteger index = new AtomicPositiveInteger();
-
- private final String version;
-
- private final ReentrantLock destroyLock = new ReentrantLock();
-
- private final Set<Invoker<?>> invokers;
-
- private final Serialization serialization;
-
- public RSocketInvoker(Class<T> serviceType, URL url, RSocket[] clients, Set<Invoker<?>> invokers) {
- super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
- this.clients = clients;
- // get version.
- this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
- this.invokers = invokers;
-
- this.serialization = CodecSupport.getSerialization(getUrl());
- }
-
- @Override
- protected Result doInvoke(final Invocation invocation) throws Throwable {
- RpcInvocation inv = (RpcInvocation) invocation;
- final String methodName = RpcUtils.getMethodName(invocation);
- inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
- inv.setAttachment(Constants.VERSION_KEY, version);
-
- RSocket currentClient;
- if (clients.length == 1) {
- currentClient = clients[0];
- } else {
- currentClient = clients[index.getAndIncrement() % clients.length];
- }
- try {
- //TODO support timeout
- int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
-
- RpcContext.getContext().setFuture(null);
- //encode inv: metadata and data(arg,attachment)
- Payload requestPayload = encodeInvocation(invocation);
-
- Class<?> retType = RpcUtils.getReturnType(invocation);
-
- if (retType != null && retType.isAssignableFrom(Mono.class)) {
- Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
- Mono<Object> bizMono = responseMono.map(new Function<Payload, Object>() {
- @Override
- public Object apply(Payload payload) {
- return decodeData(payload);
- }
- });
- RpcResult rpcResult = new RpcResult();
- rpcResult.setValue(bizMono);
- return rpcResult;
- } else if (retType != null && retType.isAssignableFrom(Flux.class)) {
- return requestStream(currentClient, requestPayload);
- } else {
- //request-reponse
- Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
- FutureSubscriber futureSubscriber = new FutureSubscriber(serialization, retType);
- responseMono.subscribe(futureSubscriber);
- return (Result) futureSubscriber.get();
- }
-
- //TODO support stream arg
- } catch (Throwable t) {
- throw new RpcException(t);
- }
- }
-
-
- private Result requestStream(RSocket currentClient, Payload requestPayload) {
- Flux<Payload> responseFlux = currentClient.requestStream(requestPayload);
- Flux<Object> retFlux = responseFlux.map(new Function<Payload, Object>() {
-
- @Override
- public Object apply(Payload payload) {
- return decodeData(payload);
- }
- });
-
- RpcResult rpcResult = new RpcResult();
- rpcResult.setValue(retFlux);
- return rpcResult;
- }
-
-
- private Object decodeData(Payload payload) {
- try {
- //TODO save the copy
- ByteBuffer dataBuffer = payload.getData();
- byte[] dataBytes = new byte[dataBuffer.remaining()];
- dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
- InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
- ObjectInput in = serialization.deserialize(null, dataInputStream);
- int flag = in.readByte();
- if ((flag & RSocketConstants.FLAG_ERROR) != 0) {
- Throwable t = (Throwable) in.readObject();
- throw t;
- } else {
- return in.readObject();
- }
- } catch (Throwable t) {
- throw Exceptions.propagate(t);
- }
- }
-
- @Override
- public boolean isAvailable() {
- if (!super.isAvailable()) {
- return false;
- }
- for (RSocket client : clients) {
- if (client.availability() > 0) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void destroy() {
- // in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every
- // time when client.close() is called, counter counts down once, and when counter reaches zero, client will be
- // closed.
- if (super.isDestroyed()) {
- return;
- } else {
- // double check to avoid dup close
- destroyLock.lock();
- try {
- if (super.isDestroyed()) {
- return;
- }
- super.destroy();
- if (invokers != null) {
- invokers.remove(this);
- }
- for (RSocket client : clients) {
- try {
- client.dispose();
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- }
-
- } finally {
- destroyLock.unlock();
- }
- }
- }
-
- private Payload encodeInvocation(Invocation invocation) throws IOException {
- byte[] metadata = encodeMetadata(invocation);
- byte[] data = encodeData(invocation);
- return DefaultPayload.create(data, metadata);
- }
-
- private byte[] encodeMetadata(Invocation invocation) throws IOException {
- Map<String, Object> metadataMap = new HashMap<String, Object>();
- metadataMap.put(RSocketConstants.SERVICE_NAME_KEY, invocation.getAttachment(Constants.PATH_KEY));
- metadataMap.put(RSocketConstants.SERVICE_VERSION_KEY, invocation.getAttachment(Constants.VERSION_KEY));
- metadataMap.put(RSocketConstants.METHOD_NAME_KEY, invocation.getMethodName());
- metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(invocation.getParameterTypes()));
- metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, (Byte) serialization.getContentTypeId());
- return MetadataCodec.encodeMetadata(metadataMap);
- }
-
-
- private byte[] encodeData(Invocation invocation) throws IOException {
- ByteArrayOutputStream dataOutputStream = new ByteArrayOutputStream();
- Serialization serialization = CodecSupport.getSerialization(getUrl());
- ObjectOutput out = serialization.serialize(getUrl(), dataOutputStream);
- RpcInvocation inv = (RpcInvocation) invocation;
- Object[] args = inv.getArguments();
- if (args != null) {
- for (int i = 0; i < args.length; i++) {
- out.writeObject(args[i]);
- }
- }
- out.writeObject(RpcUtils.getNecessaryAttachments(inv));
-
- //clean
- out.flushBuffer();
- if (out instanceof Cleanable) {
- ((Cleanable) out).cleanup();
- }
- return dataOutputStream.toByteArray();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.util.DefaultPayload;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.serialize.Cleanable;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.utils.AtomicPositiveInteger;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.RpcResult;
+import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+import org.apache.dubbo.rpc.support.RpcUtils;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+public class RSocketInvoker<T> extends AbstractInvoker<T> {
+
+ private final RSocket[] clients;
+
+ private final AtomicPositiveInteger index = new AtomicPositiveInteger();
+
+ private final String version;
+
+ private final ReentrantLock destroyLock = new ReentrantLock();
+
+ private final Set<Invoker<?>> invokers;
+
+ private final Serialization serialization;
+
+ public RSocketInvoker(Class<T> serviceType, URL url, RSocket[] clients, Set<Invoker<?>> invokers) {
+ super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
+ this.clients = clients;
+ // get version.
+ this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
+ this.invokers = invokers;
+
+ this.serialization = CodecSupport.getSerialization(getUrl());
+ }
+
+ @Override
+ protected Result doInvoke(final Invocation invocation) throws Throwable {
+ RpcInvocation inv = (RpcInvocation) invocation;
+ final String methodName = RpcUtils.getMethodName(invocation);
+ inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
+ inv.setAttachment(Constants.VERSION_KEY, version);
+
+ RSocket currentClient;
+ if (clients.length == 1) {
+ currentClient = clients[0];
+ } else {
+ currentClient = clients[index.getAndIncrement() % clients.length];
+ }
+ try {
+ //TODO support timeout
+ int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
+
+ RpcContext.getContext().setFuture(null);
+ //encode inv: metadata and data(arg,attachment)
+ Payload requestPayload = encodeInvocation(invocation);
+
+ Class<?> retType = RpcUtils.getReturnType(invocation);
+
+ if (retType != null && retType.isAssignableFrom(Mono.class)) {
+ Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
+ Mono<Object> bizMono = responseMono.map(new Function<Payload, Object>() {
+ @Override
+ public Object apply(Payload payload) {
+ return decodeData(payload);
+ }
+ });
+ RpcResult rpcResult = new RpcResult();
+ rpcResult.setValue(bizMono);
+ return rpcResult;
+ } else if (retType != null && retType.isAssignableFrom(Flux.class)) {
+ return requestStream(currentClient, requestPayload);
+ } else {
+ //request-reponse
+ Mono<Payload> responseMono = currentClient.requestResponse(requestPayload);
+ FutureSubscriber futureSubscriber = new FutureSubscriber(serialization, retType);
+ responseMono.subscribe(futureSubscriber);
+ return (Result) futureSubscriber.get();
+ }
+
+ //TODO support stream arg
+ } catch (Throwable t) {
+ throw new RpcException(t);
+ }
+ }
+
+
+ private Result requestStream(RSocket currentClient, Payload requestPayload) {
+ Flux<Payload> responseFlux = currentClient.requestStream(requestPayload);
+ Flux<Object> retFlux = responseFlux.map(new Function<Payload, Object>() {
+
+ @Override
+ public Object apply(Payload payload) {
+ return decodeData(payload);
+ }
+ });
+
+ RpcResult rpcResult = new RpcResult();
+ rpcResult.setValue(retFlux);
+ return rpcResult;
+ }
+
+
+ private Object decodeData(Payload payload) {
+ try {
+ //TODO save the copy
+ ByteBuffer dataBuffer = payload.getData();
+ byte[] dataBytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+ InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+ ObjectInput in = serialization.deserialize(null, dataInputStream);
+ int flag = in.readByte();
+ if ((flag & RSocketConstants.FLAG_ERROR) != 0) {
+ Throwable t = (Throwable) in.readObject();
+ throw t;
+ } else {
+ return in.readObject();
+ }
+ } catch (Throwable t) {
+ throw Exceptions.propagate(t);
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (!super.isAvailable()) {
+ return false;
+ }
+ for (RSocket client : clients) {
+ if (client.availability() > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void destroy() {
+ // in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every
+ // time when client.close() is called, counter counts down once, and when counter reaches zero, client will be
+ // closed.
+ if (super.isDestroyed()) {
+ return;
+ } else {
+ // double check to avoid dup close
+ destroyLock.lock();
+ try {
+ if (super.isDestroyed()) {
+ return;
+ }
+ super.destroy();
+ if (invokers != null) {
+ invokers.remove(this);
+ }
+ for (RSocket client : clients) {
+ try {
+ client.dispose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+
+ } finally {
+ destroyLock.unlock();
+ }
+ }
+ }
+
+ private Payload encodeInvocation(Invocation invocation) throws IOException {
+ byte[] metadata = encodeMetadata(invocation);
+ byte[] data = encodeData(invocation);
+ return DefaultPayload.create(data, metadata);
+ }
+
+ private byte[] encodeMetadata(Invocation invocation) throws IOException {
+ Map<String, Object> metadataMap = new HashMap<String, Object>();
+ metadataMap.put(RSocketConstants.SERVICE_NAME_KEY, invocation.getAttachment(Constants.PATH_KEY));
+ metadataMap.put(RSocketConstants.SERVICE_VERSION_KEY, invocation.getAttachment(Constants.VERSION_KEY));
+ metadataMap.put(RSocketConstants.METHOD_NAME_KEY, invocation.getMethodName());
+ metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(invocation.getParameterTypes()));
+ metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, (Byte) serialization.getContentTypeId());
+ return MetadataCodec.encodeMetadata(metadataMap);
+ }
+
+
+ private byte[] encodeData(Invocation invocation) throws IOException {
+ ByteArrayOutputStream dataOutputStream = new ByteArrayOutputStream();
+ Serialization serialization = CodecSupport.getSerialization(getUrl());
+ ObjectOutput out = serialization.serialize(getUrl(), dataOutputStream);
+ RpcInvocation inv = (RpcInvocation) invocation;
+ Object[] args = inv.getArguments();
+ if (args != null) {
+ for (int i = 0; i < args.length; i++) {
+ out.writeObject(args[i]);
+ }
+ }
+ out.writeObject(RpcUtils.getNecessaryAttachments(inv));
+
+ //clean
+ out.flushBuffer();
+ if (out instanceof Cleanable) {
+ ((Cleanable) out).cleanup();
+ }
+ return dataOutputStream.toByteArray();
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java
index 6d480d0..c64d517 100644
--- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java
@@ -1,533 +1,530 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.rsocket;
-
-import io.rsocket.AbstractRSocket;
-import io.rsocket.ConnectionSetupPayload;
-import io.rsocket.Payload;
-import io.rsocket.RSocket;
-import io.rsocket.RSocketFactory;
-import io.rsocket.SocketAcceptor;
-import io.rsocket.transport.netty.client.TcpClientTransport;
-import io.rsocket.transport.netty.server.CloseableChannel;
-import io.rsocket.transport.netty.server.TcpServerTransport;
-import io.rsocket.util.DefaultPayload;
-import org.apache.dubbo.common.Constants;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.extension.ExtensionLoader;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.serialize.ObjectInput;
-import org.apache.dubbo.common.serialize.ObjectOutput;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.common.utils.ReflectUtils;
-import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.transport.CodecSupport;
-import org.apache.dubbo.rpc.Exporter;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Protocol;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.protocol.AbstractProtocol;
-import org.apache.dubbo.rpc.support.RpcUtils;
-import org.reactivestreams.Publisher;
-import reactor.core.Exceptions;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.Function;
-
-/**
- * @author sixie.xyn on 2019/1/2.
- */
-public class RSocketProtocol extends AbstractProtocol {
-
- public static final String NAME = "rsocket";
- public static final int DEFAULT_PORT = 30880;
- private static final Logger log = LoggerFactory.getLogger(RSocketProtocol.class);
- private static RSocketProtocol INSTANCE;
-
- // <host:port,CloseableChannel>
- private final Map<String, CloseableChannel> serverMap = new ConcurrentHashMap<String, CloseableChannel>();
-
- // <host:port,RSocket>
- private final Map<String, RSocket> referenceClientMap = new ConcurrentHashMap<String, RSocket>();
-
- private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
-
- public RSocketProtocol() {
- INSTANCE = this;
- }
-
- public static RSocketProtocol getRSocketProtocol() {
- if (INSTANCE == null) {
- ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(RSocketProtocol.NAME); // load
- }
- return INSTANCE;
- }
-
- public Collection<Exporter<?>> getExporters() {
- return Collections.unmodifiableCollection(exporterMap.values());
- }
-
- Map<String, Exporter<?>> getExporterMap() {
- return exporterMap;
- }
-
- Invoker<?> getInvoker(int port, Map<String, Object> metadataMap) throws RemotingException {
- String path = (String) metadataMap.get(RSocketConstants.SERVICE_NAME_KEY);
- String serviceKey = serviceKey(port, path, (String) metadataMap.get(RSocketConstants.SERVICE_VERSION_KEY), (String) metadataMap.get(Constants.GROUP_KEY));
- RSocketExporter<?> exporter = (RSocketExporter<?>) exporterMap.get(serviceKey);
- if (exporter == null) {
- //throw new Throwable("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
- throw new RuntimeException("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch ");
- }
-
- return exporter.getInvoker();
- }
-
- public Collection<Invoker<?>> getInvokers() {
- return Collections.unmodifiableCollection(invokers);
- }
-
- @Override
- public int getDefaultPort() {
- return DEFAULT_PORT;
- }
-
- @Override
- public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
- URL url = invoker.getUrl();
-
- // export service.
- String key = serviceKey(url);
- RSocketExporter<T> exporter = new RSocketExporter<T>(invoker, key, exporterMap);
- exporterMap.put(key, exporter);
-
- openServer(url);
- return exporter;
- }
-
- private void openServer(URL url) {
- String key = url.getAddress();
- //client can export a service which's only for server to invoke
- boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
- if (isServer) {
- CloseableChannel server = serverMap.get(key);
- if (server == null) {
- synchronized (this) {
- server = serverMap.get(key);
- if (server == null) {
- serverMap.put(key, createServer(url));
- }
- }
- }
- }
- }
-
- private CloseableChannel createServer(URL url) {
- try {
- String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
- int bindPort = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
- if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
- bindIp = NetUtils.ANYHOST;
- }
- return RSocketFactory.receive()
- .acceptor(new SocketAcceptorImpl(bindPort))
- .transport(TcpServerTransport.create(bindIp, bindPort))
- .start()
- .block();
- } catch (Throwable e) {
- throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
- }
- }
-
-
- @Override
- public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
- // create rpc invoker.
- RSocketInvoker<T> invoker = new RSocketInvoker<T>(serviceType, url, getClients(url), invokers);
- invokers.add(invoker);
- return invoker;
- }
-
- private RSocket[] getClients(URL url) {
- // whether to share connection
- boolean service_share_connect = false;
- int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
- // if not configured, connection is shared, otherwise, one connection for one service
- if (connections == 0) {
- service_share_connect = true;
- connections = 1;
- }
-
- RSocket[] clients = new RSocket[connections];
- for (int i = 0; i < clients.length; i++) {
- if (service_share_connect) {
- clients[i] = getSharedClient(url);
- } else {
- clients[i] = initClient(url);
- }
- }
- return clients;
- }
-
- /**
- * Get shared connection
- */
- private RSocket getSharedClient(URL url) {
- String key = url.getAddress();
- RSocket client = referenceClientMap.get(key);
- if (client != null) {
- return client;
- }
-
- locks.putIfAbsent(key, new Object());
- synchronized (locks.get(key)) {
- if (referenceClientMap.containsKey(key)) {
- return referenceClientMap.get(key);
- }
-
- client = initClient(url);
- referenceClientMap.put(key, client);
- locks.remove(key);
- return client;
- }
- }
-
- /**
- * Create new connection
- */
- private RSocket initClient(URL url) {
- try {
- InetSocketAddress serverAddress = new InetSocketAddress(NetUtils.filterLocalHost(url.getHost()), url.getPort());
- RSocket client = RSocketFactory.connect().keepAliveTickPeriod(Duration.ZERO).keepAliveAckTimeout(Duration.ZERO).acceptor(
- rSocket ->
- new AbstractRSocket() {
- public Mono<Payload> requestResponse(Payload payload) {
- //TODO support Mono arg
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Flux<Payload> requestStream(Payload payload) {
- //TODO support Flux arg
- throw new UnsupportedOperationException();
- }
- })
- .transport(TcpClientTransport.create(serverAddress))
- .start()
- .block();
- return client;
- } catch (Throwable e) {
- throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
- }
-
- }
-
- @Override
- public void destroy() {
- for (String key : new ArrayList<String>(serverMap.keySet())) {
- CloseableChannel server = serverMap.remove(key);
- if (server != null) {
- try {
- if (logger.isInfoEnabled()) {
- logger.info("Close dubbo server: " + server.address());
- }
- server.dispose();
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- }
- }
-
- for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
- RSocket client = referenceClientMap.remove(key);
- if (client != null) {
- try {
-// if (logger.isInfoEnabled()) {
-// logger.info("Close dubbo connect: " + client. + "-->" + client.getRemoteAddress());
-// }
- client.dispose();
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- }
- }
- super.destroy();
- }
-
-
- //server process logic
- private class SocketAcceptorImpl implements SocketAcceptor {
-
- private final int port;
-
- public SocketAcceptorImpl(int port) {
- this.port = port;
- }
-
- @Override
- public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
- return Mono.just(
- new AbstractRSocket() {
- public Mono<Payload> requestResponse(Payload payload) {
- try {
- Map<String, Object> metadata = decodeMetadata(payload);
- Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
- Invocation inv = decodeInvocation(payload, metadata, serializeId);
-
- Result result = inv.getInvoker().invoke(inv);
-
- Class<?> retType = RpcUtils.getReturnType(inv);
- //ok
- if (retType != null && Mono.class.isAssignableFrom(retType)) {
- Throwable th = result.getException();
- if (th == null) {
- Mono bizMono = (Mono) result.getValue();
- Mono<Payload> retMono = bizMono.map(new Function<Object, Payload>() {
- @Override
- public Payload apply(Object o) {
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
- out.writeByte((byte) 0);
- out.writeObject(o);
- out.flushBuffer();
- bos.flush();
- bos.close();
- Payload responsePayload = DefaultPayload.create(bos.toByteArray());
- return responsePayload;
- } catch (Throwable t) {
- throw Exceptions.propagate(t);
- }
- }
- }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
- @Override
- public Publisher<Payload> apply(Throwable throwable) {
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
- out.writeByte((byte) RSocketConstants.FLAG_ERROR);
- out.writeObject(throwable);
- out.flushBuffer();
- bos.flush();
- bos.close();
- Payload errorPayload = DefaultPayload.create(bos.toByteArray());
- return Flux.just(errorPayload);
- } catch (Throwable t) {
- throw Exceptions.propagate(t);
- }
- }
- });
-
- return retMono;
- } else {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
- out.writeByte((byte) RSocketConstants.FLAG_ERROR);
- out.writeObject(th);
- out.flushBuffer();
- bos.flush();
- bos.close();
- Payload errorPayload = DefaultPayload.create(bos.toByteArray());
- return Mono.just(errorPayload);
- }
-
- } else {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
- int flag = RSocketConstants.FLAG_HAS_ATTACHMENT;
-
- Throwable th = result.getException();
- if (th == null) {
- Object ret = result.getValue();
- if (ret == null) {
- flag |= RSocketConstants.FLAG_NULL_VALUE;
- out.writeByte((byte) flag);
- } else {
- out.writeByte((byte) flag);
- out.writeObject(ret);
- }
- } else {
- flag |= RSocketConstants.FLAG_ERROR;
- out.writeByte((byte) flag);
- out.writeObject(th);
- }
- out.writeObject(result.getAttachments());
- out.flushBuffer();
- bos.flush();
- bos.close();
-
- Payload responsePayload = DefaultPayload.create(bos.toByteArray());
- return Mono.just(responsePayload);
- }
- } catch (Throwable t) {
- //application error
- return Mono.error(t);
- } finally {
- payload.release();
- }
- }
-
- public Flux<Payload> requestStream(Payload payload) {
- try {
- Map<String, Object> metadata = decodeMetadata(payload);
- Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
- Invocation inv = decodeInvocation(payload, metadata, serializeId);
-
- Result result = inv.getInvoker().invoke(inv);
- //Class<?> retType = RpcUtils.getReturnType(inv);
-
- Throwable th = result.getException();
- if (th != null) {
- Payload errorPayload = encodeError(th, serializeId);
- return Flux.just(errorPayload);
- }
-
- Flux flux = (Flux) result.getValue();
- Flux<Payload> retFlux = flux.map(new Function<Object, Payload>() {
- @Override
- public Payload apply(Object o) {
- try {
- return encodeData(o, serializeId);
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- }
- }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
- @Override
- public Publisher<Payload> apply(Throwable throwable) {
- try {
- Payload errorPayload = encodeError(throwable, serializeId);
- return Flux.just(errorPayload);
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- }
- });
- return retFlux;
- } catch (Throwable t) {
- return Flux.error(t);
- } finally {
- payload.release();
- }
- }
-
- private Payload encodeData(Object data, byte serializeId) throws Throwable {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
- out.writeByte((byte) 0);
- out.writeObject(data);
- out.flushBuffer();
- bos.flush();
- bos.close();
- return DefaultPayload.create(bos.toByteArray());
- }
-
- private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
- out.writeByte((byte) RSocketConstants.FLAG_ERROR);
- out.writeObject(throwable);
- out.flushBuffer();
- bos.flush();
- bos.close();
- return DefaultPayload.create(bos.toByteArray());
- }
-
- private Map<String, Object> decodeMetadata(Payload payload) throws IOException {
- ByteBuffer metadataBuffer = payload.getMetadata();
- byte[] metadataBytes = new byte[metadataBuffer.remaining()];
- metadataBuffer.get(metadataBytes, metadataBuffer.position(), metadataBuffer.remaining());
- return MetadataCodec.decodeMetadata(metadataBytes);
- }
-
- private Invocation decodeInvocation(Payload payload, Map<String, Object> metadata, Byte serializeId) throws RemotingException, IOException, ClassNotFoundException {
- Invoker<?> invoker = getInvoker(port, metadata);
-
- String serviceName = (String) metadata.get(RSocketConstants.SERVICE_NAME_KEY);
- String version = (String) metadata.get(RSocketConstants.SERVICE_VERSION_KEY);
- String methodName = (String) metadata.get(RSocketConstants.METHOD_NAME_KEY);
- String paramType = (String) metadata.get(RSocketConstants.PARAM_TYPE_KEY);
-
- ByteBuffer dataBuffer = payload.getData();
- byte[] dataBytes = new byte[dataBuffer.remaining()];
- dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
-
-
- //TODO how to get remote address
- //RpcContext rpcContext = RpcContext.getContext();
- //rpcContext.setRemoteAddress(channel.getRemoteAddress());
-
-
- RpcInvocation inv = new RpcInvocation();
- inv.setInvoker(invoker);
- inv.setAttachment(Constants.PATH_KEY, serviceName);
- inv.setAttachment(Constants.VERSION_KEY, version);
- inv.setMethodName(methodName);
-
-
- InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
- ObjectInput in = CodecSupport.getSerializationById(serializeId).deserialize(null, dataInputStream);
-
- Object[] args;
- Class<?>[] pts;
- String desc = paramType;
- if (desc.length() == 0) {
- pts = new Class<?>[0];
- args = new Object[0];
- } else {
- pts = ReflectUtils.desc2classArray(desc);
- args = new Object[pts.length];
- for (int i = 0; i < args.length; i++) {
- try {
- args[i] = in.readObject(pts[i]);
- } catch (Exception e) {
- if (log.isWarnEnabled()) {
- log.warn("Decode argument failed: " + e.getMessage(), e);
- }
- }
- }
- }
- inv.setParameterTypes(pts);
- inv.setArguments(args);
- Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
- if (map != null && map.size() > 0) {
- inv.addAttachments(map);
- }
- return inv;
- }
- });
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.rsocket;
+
+import io.rsocket.AbstractRSocket;
+import io.rsocket.ConnectionSetupPayload;
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.RSocketFactory;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.transport.netty.server.CloseableChannel;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.DefaultPayload;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.protocol.AbstractProtocol;
+import org.apache.dubbo.rpc.support.RpcUtils;
+import org.reactivestreams.Publisher;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+public class RSocketProtocol extends AbstractProtocol {
+
+ public static final String NAME = "rsocket";
+ public static final int DEFAULT_PORT = 30880;
+ private static final Logger log = LoggerFactory.getLogger(RSocketProtocol.class);
+ private static RSocketProtocol INSTANCE;
+
+ // <host:port,CloseableChannel>
+ private final Map<String, CloseableChannel> serverMap = new ConcurrentHashMap<String, CloseableChannel>();
+
+ // <host:port,RSocket>
+ private final Map<String, RSocket> referenceClientMap = new ConcurrentHashMap<String, RSocket>();
+
+ private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
+
+ public RSocketProtocol() {
+ INSTANCE = this;
+ }
+
+ public static RSocketProtocol getRSocketProtocol() {
+ if (INSTANCE == null) {
+ ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(RSocketProtocol.NAME); // load
+ }
+ return INSTANCE;
+ }
+
+ public Collection<Exporter<?>> getExporters() {
+ return Collections.unmodifiableCollection(exporterMap.values());
+ }
+
+ Map<String, Exporter<?>> getExporterMap() {
+ return exporterMap;
+ }
+
+ Invoker<?> getInvoker(int port, Map<String, Object> metadataMap) throws RemotingException {
+ String path = (String) metadataMap.get(RSocketConstants.SERVICE_NAME_KEY);
+ String serviceKey = serviceKey(port, path, (String) metadataMap.get(RSocketConstants.SERVICE_VERSION_KEY), (String) metadataMap.get(Constants.GROUP_KEY));
+ RSocketExporter<?> exporter = (RSocketExporter<?>) exporterMap.get(serviceKey);
+ if (exporter == null) {
+ //throw new Throwable("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
+ throw new RuntimeException("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch ");
+ }
+
+ return exporter.getInvoker();
+ }
+
+ public Collection<Invoker<?>> getInvokers() {
+ return Collections.unmodifiableCollection(invokers);
+ }
+
+ @Override
+ public int getDefaultPort() {
+ return DEFAULT_PORT;
+ }
+
+ @Override
+ public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
+ URL url = invoker.getUrl();
+
+ // export service.
+ String key = serviceKey(url);
+ RSocketExporter<T> exporter = new RSocketExporter<T>(invoker, key, exporterMap);
+ exporterMap.put(key, exporter);
+
+ openServer(url);
+ return exporter;
+ }
+
+ private void openServer(URL url) {
+ String key = url.getAddress();
+ //client can export a service which's only for server to invoke
+ boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
+ if (isServer) {
+ CloseableChannel server = serverMap.get(key);
+ if (server == null) {
+ synchronized (this) {
+ server = serverMap.get(key);
+ if (server == null) {
+ serverMap.put(key, createServer(url));
+ }
+ }
+ }
+ }
+ }
+
+ private CloseableChannel createServer(URL url) {
+ try {
+ String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
+ int bindPort = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
+ if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
+ bindIp = NetUtils.ANYHOST;
+ }
+ return RSocketFactory.receive()
+ .acceptor(new SocketAcceptorImpl(bindPort))
+ .transport(TcpServerTransport.create(bindIp, bindPort))
+ .start()
+ .block();
+ } catch (Throwable e) {
+ throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
+ }
+ }
+
+
+ @Override
+ public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
+ // create rpc invoker.
+ RSocketInvoker<T> invoker = new RSocketInvoker<T>(serviceType, url, getClients(url), invokers);
+ invokers.add(invoker);
+ return invoker;
+ }
+
+ private RSocket[] getClients(URL url) {
+ // whether to share connection
+ boolean service_share_connect = false;
+ int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
+ // if not configured, connection is shared, otherwise, one connection for one service
+ if (connections == 0) {
+ service_share_connect = true;
+ connections = 1;
+ }
+
+ RSocket[] clients = new RSocket[connections];
+ for (int i = 0; i < clients.length; i++) {
+ if (service_share_connect) {
+ clients[i] = getSharedClient(url);
+ } else {
+ clients[i] = initClient(url);
+ }
+ }
+ return clients;
+ }
+
+ /**
+ * Get shared connection
+ */
+ private RSocket getSharedClient(URL url) {
+ String key = url.getAddress();
+ RSocket client = referenceClientMap.get(key);
+ if (client != null) {
+ return client;
+ }
+
+ locks.putIfAbsent(key, new Object());
+ synchronized (locks.get(key)) {
+ if (referenceClientMap.containsKey(key)) {
+ return referenceClientMap.get(key);
+ }
+
+ client = initClient(url);
+ referenceClientMap.put(key, client);
+ locks.remove(key);
+ return client;
+ }
+ }
+
+ /**
+ * Create new connection
+ */
+ private RSocket initClient(URL url) {
+ try {
+ InetSocketAddress serverAddress = new InetSocketAddress(NetUtils.filterLocalHost(url.getHost()), url.getPort());
+ RSocket client = RSocketFactory.connect().keepAliveTickPeriod(Duration.ZERO).keepAliveAckTimeout(Duration.ZERO).acceptor(
+ rSocket ->
+ new AbstractRSocket() {
+ public Mono<Payload> requestResponse(Payload payload) {
+ //TODO support Mono arg
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Flux<Payload> requestStream(Payload payload) {
+ //TODO support Flux arg
+ throw new UnsupportedOperationException();
+ }
+ })
+ .transport(TcpClientTransport.create(serverAddress))
+ .start()
+ .block();
+ return client;
+ } catch (Throwable e) {
+ throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void destroy() {
+ for (String key : new ArrayList<String>(serverMap.keySet())) {
+ CloseableChannel server = serverMap.remove(key);
+ if (server != null) {
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Close dubbo server: " + server.address());
+ }
+ server.dispose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+ }
+
+ for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
+ RSocket client = referenceClientMap.remove(key);
+ if (client != null) {
+ try {
+// if (logger.isInfoEnabled()) {
+// logger.info("Close dubbo connect: " + client. + "-->" + client.getRemoteAddress());
+// }
+ client.dispose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+ }
+ super.destroy();
+ }
+
+
+ //server process logic
+ private class SocketAcceptorImpl implements SocketAcceptor {
+
+ private final int port;
+
+ public SocketAcceptorImpl(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
+ return Mono.just(
+ new AbstractRSocket() {
+ public Mono<Payload> requestResponse(Payload payload) {
+ try {
+ Map<String, Object> metadata = decodeMetadata(payload);
+ Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
+ Invocation inv = decodeInvocation(payload, metadata, serializeId);
+
+ Result result = inv.getInvoker().invoke(inv);
+
+ Class<?> retType = RpcUtils.getReturnType(inv);
+ //ok
+ if (retType != null && Mono.class.isAssignableFrom(retType)) {
+ Throwable th = result.getException();
+ if (th == null) {
+ Mono bizMono = (Mono) result.getValue();
+ Mono<Payload> retMono = bizMono.map(new Function<Object, Payload>() {
+ @Override
+ public Payload apply(Object o) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) 0);
+ out.writeObject(o);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ Payload responsePayload = DefaultPayload.create(bos.toByteArray());
+ return responsePayload;
+ } catch (Throwable t) {
+ throw Exceptions.propagate(t);
+ }
+ }
+ }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
+ @Override
+ public Publisher<Payload> apply(Throwable throwable) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+ out.writeObject(throwable);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ Payload errorPayload = DefaultPayload.create(bos.toByteArray());
+ return Flux.just(errorPayload);
+ } catch (Throwable t) {
+ throw Exceptions.propagate(t);
+ }
+ }
+ });
+
+ return retMono;
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+ out.writeObject(th);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ Payload errorPayload = DefaultPayload.create(bos.toByteArray());
+ return Mono.just(errorPayload);
+ }
+
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ int flag = RSocketConstants.FLAG_HAS_ATTACHMENT;
+
+ Throwable th = result.getException();
+ if (th == null) {
+ Object ret = result.getValue();
+ if (ret == null) {
+ flag |= RSocketConstants.FLAG_NULL_VALUE;
+ out.writeByte((byte) flag);
+ } else {
+ out.writeByte((byte) flag);
+ out.writeObject(ret);
+ }
+ } else {
+ flag |= RSocketConstants.FLAG_ERROR;
+ out.writeByte((byte) flag);
+ out.writeObject(th);
+ }
+ out.writeObject(result.getAttachments());
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+
+ Payload responsePayload = DefaultPayload.create(bos.toByteArray());
+ return Mono.just(responsePayload);
+ }
+ } catch (Throwable t) {
+ //application error
+ return Mono.error(t);
+ } finally {
+ payload.release();
+ }
+ }
+
+ public Flux<Payload> requestStream(Payload payload) {
+ try {
+ Map<String, Object> metadata = decodeMetadata(payload);
+ Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue();
+ Invocation inv = decodeInvocation(payload, metadata, serializeId);
+
+ Result result = inv.getInvoker().invoke(inv);
+ //Class<?> retType = RpcUtils.getReturnType(inv);
+
+ Throwable th = result.getException();
+ if (th != null) {
+ Payload errorPayload = encodeError(th, serializeId);
+ return Flux.just(errorPayload);
+ }
+
+ Flux flux = (Flux) result.getValue();
+ Flux<Payload> retFlux = flux.map(new Function<Object, Payload>() {
+ @Override
+ public Payload apply(Object o) {
+ try {
+ return encodeData(o, serializeId);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ }).onErrorResume(new Function<Throwable, Publisher<Payload>>() {
+ @Override
+ public Publisher<Payload> apply(Throwable throwable) {
+ try {
+ Payload errorPayload = encodeError(throwable, serializeId);
+ return Flux.just(errorPayload);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ });
+ return retFlux;
+ } catch (Throwable t) {
+ return Flux.error(t);
+ } finally {
+ payload.release();
+ }
+ }
+
+ private Payload encodeData(Object data, byte serializeId) throws Throwable {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) 0);
+ out.writeObject(data);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ return DefaultPayload.create(bos.toByteArray());
+ }
+
+ private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos);
+ out.writeByte((byte) RSocketConstants.FLAG_ERROR);
+ out.writeObject(throwable);
+ out.flushBuffer();
+ bos.flush();
+ bos.close();
+ return DefaultPayload.create(bos.toByteArray());
+ }
+
+ private Map<String, Object> decodeMetadata(Payload payload) throws IOException {
+ ByteBuffer metadataBuffer = payload.getMetadata();
+ byte[] metadataBytes = new byte[metadataBuffer.remaining()];
+ metadataBuffer.get(metadataBytes, metadataBuffer.position(), metadataBuffer.remaining());
+ return MetadataCodec.decodeMetadata(metadataBytes);
+ }
+
+ private Invocation decodeInvocation(Payload payload, Map<String, Object> metadata, Byte serializeId) throws RemotingException, IOException, ClassNotFoundException {
+ Invoker<?> invoker = getInvoker(port, metadata);
+
+ String serviceName = (String) metadata.get(RSocketConstants.SERVICE_NAME_KEY);
+ String version = (String) metadata.get(RSocketConstants.SERVICE_VERSION_KEY);
+ String methodName = (String) metadata.get(RSocketConstants.METHOD_NAME_KEY);
+ String paramType = (String) metadata.get(RSocketConstants.PARAM_TYPE_KEY);
+
+ ByteBuffer dataBuffer = payload.getData();
+ byte[] dataBytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
+
+
+ //TODO how to get remote address
+ //RpcContext rpcContext = RpcContext.getContext();
+ //rpcContext.setRemoteAddress(channel.getRemoteAddress());
+
+
+ RpcInvocation inv = new RpcInvocation();
+ inv.setInvoker(invoker);
+ inv.setAttachment(Constants.PATH_KEY, serviceName);
+ inv.setAttachment(Constants.VERSION_KEY, version);
+ inv.setMethodName(methodName);
+
+
+ InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
+ ObjectInput in = CodecSupport.getSerializationById(serializeId).deserialize(null, dataInputStream);
+
+ Object[] args;
+ Class<?>[] pts;
+ String desc = paramType;
+ if (desc.length() == 0) {
+ pts = new Class<?>[0];
+ args = new Object[0];
+ } else {
+ pts = ReflectUtils.desc2classArray(desc);
+ args = new Object[pts.length];
+ for (int i = 0; i < args.length; i++) {
+ try {
+ args[i] = in.readObject(pts[i]);
+ } catch (Exception e) {
+ if (log.isWarnEnabled()) {
+ log.warn("Decode argument failed: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ inv.setParameterTypes(pts);
+ inv.setArguments(args);
+ Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
+ if (map != null && map.size() > 0) {
+ inv.addAttachments(map);
+ }
+ return inv;
+ }
+ });
+ }
+ }
+}