You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/25 06:38:58 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] Add TransportObserver doc
(#9084)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 4aaad1e [3.0-Triple] Add TransportObserver doc (#9084)
4aaad1e is described below
commit 4aaad1e6e986ac946097f5b9abc72f04eb08ca5b
Author: GuoHao <gu...@gmail.com>
AuthorDate: Mon Oct 25 14:38:48 2021 +0800
[3.0-Triple] Add TransportObserver doc (#9084)
* Add TransportObserver doc
* Reactor TransportObserver
* Reactor MessageObserver
* Minor bugfix
* remove unused http2error
* opt server stream condition
* fix ServerOutboundTransportObserver
* Ignore unused msg
* Remove TripleUtil
* Add some doc
Co-authored-by: earthchen <yo...@duobei.com>
---
.../apache/dubbo/common/stream/StreamObserver.java | 12 +-
.../rpc/protocol/tri/AbstractClientStream.java | 250 ++++++++++++-------
.../rpc/protocol/tri/AbstractServerStream.java | 61 ++++-
.../dubbo/rpc/protocol/tri/AbstractStream.java | 277 +++++++++++----------
...r.java => ClientOutboundTransportObserver.java} | 16 +-
.../dubbo/rpc/protocol/tri/ClientStream.java | 23 +-
.../rpc/protocol/tri/InboundTransportObserver.java | 60 +++++
...bserver.java => OutboundTransportObserver.java} | 41 ++-
...r.java => ServerOutboundTransportObserver.java} | 20 +-
.../dubbo/rpc/protocol/tri/ServerStream.java | 33 +--
.../org/apache/dubbo/rpc/protocol/tri/Stream.java | 20 +-
.../dubbo/rpc/protocol/tri/TransportObserver.java | 30 ++-
.../dubbo/rpc/protocol/tri/TransportState.java | 16 +-
.../protocol/tri/TripleClientInboundHandler.java | 2 +-
.../protocol/tri/TripleClientRequestHandler.java | 1 -
.../tri/TripleHttp2ClientResponseHandler.java | 16 +-
.../tri/TripleHttp2FrameServerHandler.java | 21 +-
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 3 +-
.../tri/TripleServerConnectionHandler.java | 24 +-
.../protocol/tri/TripleServerInboundHandler.java | 2 +-
.../apache/dubbo/rpc/protocol/tri/TripleUtil.java | 252 -------------------
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 81 ++++--
...ler.java => UnaryInboundTransportObserver.java} | 25 +-
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 14 +-
.../dubbo/rpc/protocol/tri/TransportStateTest.java | 6 +
.../rpc/protocol/tri/UnaryClientStreamTest.java | 4 +-
26 files changed, 672 insertions(+), 638 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java
index debafaa..7ef12ea 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java
@@ -17,16 +17,24 @@
package org.apache.dubbo.common.stream;
+/**
+ * StreamObserver is a common streaming API. It is an observer for receiving messages.
+ * Implementations are NOT required to be thread-safe.
+ *
+ * @param <T> type of message
+ */
public interface StreamObserver<T> {
/**
* onNext
- * @param data
+ *
+ * @param data to process
*/
void onNext(T data);
/**
* onError
- * @param throwable
+ *
+ * @param throwable error
*/
void onError(Throwable throwable);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index f7dab7e..97dbe04 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -20,6 +20,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.serialize.MultipleSerialization;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.remoting.Constants;
@@ -33,15 +34,18 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceModel;
import org.apache.dubbo.triple.TripleWrapper;
+import com.google.protobuf.ByteString;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.util.AsciiString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -51,6 +55,9 @@ import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+/**
+ * Abstracting common actions for client streaming.
+ */
public abstract class AbstractClientStream extends AbstractStream implements Stream {
private final AsciiString scheme;
@@ -62,7 +69,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
protected AbstractClientStream(URL url) {
super(url);
this.scheme = getSchemeFromUrl(url);
- // for client cancel,send rst frame to server
this.getCancellationContext().addListener(context -> {
Throwable throwable = this.getCancellationContext().getCancellationCause();
if (LOGGER.isWarnEnabled()) {
@@ -70,7 +76,9 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
+ getConsumerModel().getServiceName() + "#" + getMethodName() +
" was canceled by local exception ", throwable);
}
- this.asTransportObserver().onReset(getHttp2Error(throwable));
+ // for client cancel,send rst frame to server
+ this.outboundTransportObserver()
+ .onError(GrpcStatus.fromCode(GrpcStatus.Code.CANCELLED).withCause(throwable));
});
}
@@ -83,6 +91,13 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
return new ClientStream(url);
}
+ /**
+ * TODO move this method to somewhere else
+ *
+ * @param req the request
+ * @param connection connection
+ * @return a client stream
+ */
public static AbstractClientStream newClientStream(Request req, Connection connection) {
final RpcInvocation inv = (RpcInvocation) req.getData();
final URL url = inv.getInvoker().getUrl();
@@ -100,6 +115,32 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
return stream;
}
+ private static Compressor getCompressor(URL url, ServiceModel model) {
+ String compressorStr = url.getParameter(COMPRESSOR_KEY);
+ if (compressorStr == null) {
+ // Compressor can not be set by dynamic config
+ compressorStr = ConfigurationUtils
+ .getCachedDynamicProperty(model.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+ }
+ return Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
+ }
+
+ /**
+ * Get the tri protocol special MethodDescriptor
+ */
+ private static MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
+ List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
+ if (CollectionUtils.isEmpty(methodDescriptors)) {
+ throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ }
+ for (MethodDescriptor methodDescriptor : methodDescriptors) {
+ if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
+ return methodDescriptor;
+ }
+ }
+ throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ }
+
protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
execute(() -> {
channel.pipeline()
@@ -107,9 +148,10 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
.addLast(new TripleClientInboundHandler());
channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(this);
- final ClientTransportObserver clientTransportObserver = new ClientTransportObserver(channel, promise);
+ final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(channel, promise);
subscribe(clientTransportObserver);
try {
+ DefaultFuture2.addTimeoutListener(getRequestId(), channel::close);
doOnStartCall();
} catch (Throwable throwable) {
cancel(throwable);
@@ -125,63 +167,8 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
return new ClientStreamObserverImpl(getCancellationContext());
}
- protected class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
-
- public ClientStreamObserverImpl(CancellationContext cancellationContext) {
- super(cancellationContext);
- }
-
- @Override
- public void onNext(Object data) {
- if (getState().allowSendMeta()) {
- final Metadata metadata = createRequestMeta(getRpcInvocation());
- getTransportSubscriber().onMetadata(metadata, false);
- }
- if (getState().allowSendData()) {
- final byte[] bytes = encodeRequest(data);
- getTransportSubscriber().onData(bytes, false);
- }
- }
-
- /**
- * Handle all exceptions in the request process, other procedures directly throw
- * <p>
- * other procedures is {@link ClientStreamObserver#onNext(Object)} and {@link ClientStreamObserver#onCompleted()}
- */
- @Override
- public void onError(Throwable throwable) {
- if (getState().allowSendEndStream()) {
- GrpcStatus status = GrpcStatus.getStatus(throwable);
- transportError(status, null, getState().allowSendMeta());
- } else {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("Triple request to "
- + getConsumerModel().getServiceName() + "#" + getMethodName() +
- " was failed by exception ", throwable);
- }
- }
- }
-
- @Override
- public void onCompleted() {
- if (getState().allowSendEndStream()) {
- getTransportSubscriber().onComplete();
- }
- }
-
- @Override
- public void setCompression(String compression) {
- if (!getState().allowSendMeta()) {
- cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
- return;
- }
- Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
- setCompressor(compressor);
- }
- }
-
@Override
- protected void cancelByRemoteReset(Http2Error http2Error) {
+ protected void cancelByRemoteReset() {
DefaultFuture2.getFuture(getRequestId()).cancel();
}
@@ -190,18 +177,17 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
getCancellationContext().cancel(throwable);
}
-
@Override
public void execute(Runnable runnable) {
try {
super.execute(runnable);
} catch (RejectedExecutionException e) {
LOGGER.error("Consumer's thread pool is full", e);
- getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
+ outboundMessageSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
.withDescription("Consumer's thread pool is full").asException());
} catch (Throwable t) {
LOGGER.error("Consumer submit request to thread pool error ", t);
- getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ outboundMessageSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withCause(t)
.withDescription("Consumer's error")
.asException());
@@ -243,11 +229,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
}
}
- private Http2Error getHttp2Error(Throwable throwable) {
- // todo Convert the exception to http2Error
- return Http2Error.CANCEL;
- }
-
public ConsumerModel getConsumerModel() {
return consumerModel;
}
@@ -270,17 +251,53 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
} else {
obj = getRequestValue(value);
}
- out = TripleUtil.pack(obj);
+ out = pack(obj);
return super.compress(out);
}
private TripleWrapper.TripleRequestWrapper getRequestWrapper(Object value) {
if (getMethodDescriptor().isStream()) {
String type = getMethodDescriptor().getParameterClasses()[0].getName();
- return TripleUtil.wrapReq(getUrl(), getSerializeType(), value, type, getMultipleSerialization());
+ return wrapReq(getUrl(), getSerializeType(), value, type, getMultipleSerialization());
} else {
RpcInvocation invocation = (RpcInvocation) value;
- return TripleUtil.wrapReq(getUrl(), invocation, getMultipleSerialization());
+ return wrapReq(getUrl(), invocation, getMultipleSerialization());
+ }
+ }
+
+ private TripleWrapper.TripleRequestWrapper wrapReq(URL url, RpcInvocation invocation,
+ MultipleSerialization serialization) {
+ try {
+ String serializationName = (String) invocation.getObjectAttachment(Constants.SERIALIZATION_KEY);
+ final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
+ .setSerializeType(convertHessianToWrapper(serializationName));
+ for (int i = 0; i < invocation.getArguments().length; i++) {
+ final String clz = invocation.getParameterTypes()[i].getName();
+ builder.addArgTypes(clz);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ serialization.serialize(url, serializationName, clz, invocation.getArguments()[i], bos);
+ builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to pack wrapper req", e);
+ }
+ }
+
+ public TripleWrapper.TripleRequestWrapper wrapReq(URL url, String serializeType, Object req,
+ String type,
+ MultipleSerialization multipleSerialization) {
+ try {
+ final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
+ .addArgTypes(type)
+ .setSerializeType(convertHessianToWrapper(serializeType));
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ multipleSerialization.serialize(url, serializeType, type, req, bos);
+ builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
+ bos.close();
+ return builder.build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to pack wrapper req", e);
}
}
@@ -299,22 +316,36 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
ClassLoadUtil.switchContextLoader(getConsumerModel().getClassLoader());
}
if (getMethodDescriptor().isNeedWrap()) {
- final TripleWrapper.TripleResponseWrapper wrapper = TripleUtil.unpack(data,
+ final TripleWrapper.TripleResponseWrapper wrapper = unpack(data,
TripleWrapper.TripleResponseWrapper.class);
- if (!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()))) {
+ if (!getSerializeType().equals(convertHessianFromWrapper(wrapper.getSerializeType()))) {
throw new UnsupportedOperationException("Received inconsistent serialization type from server, " +
"reject to deserialize! Expected:" + getSerializeType() +
- " Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()));
+ " Actual:" + convertHessianFromWrapper(wrapper.getSerializeType()));
}
- return TripleUtil.unwrapResp(getUrl(), wrapper, getMultipleSerialization());
+ return unwrapResp(getUrl(), wrapper, getMultipleSerialization());
} else {
- return TripleUtil.unpack(data, getMethodDescriptor().getReturnClass());
+ return unpack(data, getMethodDescriptor().getReturnClass());
}
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}
}
+ public Object unwrapResp(URL url, TripleWrapper.TripleResponseWrapper wrap,
+ MultipleSerialization serialization) {
+ String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
+ try {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
+ final Object ret = serialization.deserialize(url, serializeType, wrap.getType(), bais);
+ bais.close();
+ return ret;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to unwrap resp", e);
+ }
+ }
+
+
protected Metadata createRequestMeta(RpcInvocation inv) {
Metadata metadata = new DefaultMetadata();
// put http2 params
@@ -347,29 +378,58 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
return "/" + inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName();
}
- private static Compressor getCompressor(URL url, ServiceModel model) {
- String compressorStr = url.getParameter(COMPRESSOR_KEY);
- if (compressorStr == null) {
- // Compressor can not be set by dynamic config
- compressorStr = ConfigurationUtils
- .getCachedDynamicProperty(model.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+ protected class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+
+ public ClientStreamObserverImpl(CancellationContext cancellationContext) {
+ super(cancellationContext);
}
- return Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
- }
- /**
- * Get the tri protocol special MethodDescriptor
- */
- private static MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
- List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
- if (CollectionUtils.isEmpty(methodDescriptors)) {
- throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ @Override
+ public void onNext(Object data) {
+ if (getState().allowSendMeta()) {
+ final Metadata metadata = createRequestMeta(getRpcInvocation());
+ outboundTransportObserver().onMetadata(metadata, false);
+ }
+ if (getState().allowSendData()) {
+ final byte[] bytes = encodeRequest(data);
+ outboundTransportObserver().onData(bytes, false);
+ }
}
- for (MethodDescriptor methodDescriptor : methodDescriptors) {
- if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
- return methodDescriptor;
+
+ /**
+ * Handle all exceptions in the request process, other procedures directly throw
+ * <p>
+ * other procedures is {@link ClientStreamObserver#onNext(Object)} and {@link ClientStreamObserver#onCompleted()}
+ */
+ @Override
+ public void onError(Throwable throwable) {
+ if (getState().allowSendEndStream()) {
+ GrpcStatus status = GrpcStatus.getStatus(throwable);
+ transportError(status, null, getState().allowSendMeta());
+ } else {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("Triple request to "
+ + getConsumerModel().getServiceName() + "#" + getMethodName() +
+ " was failed by exception ", throwable);
+ }
}
}
- throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+
+ @Override
+ public void onCompleted() {
+ if (getState().allowSendEndStream()) {
+ outboundTransportObserver().onComplete();
+ }
+ }
+
+ @Override
+ public void setCompression(String compression) {
+ if (!getState().allowSendMeta()) {
+ cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+ return;
+ }
+ Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+ setCompressor(compressor);
+ }
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index f566641..17dd26d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.serialize.MultipleSerialization;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.Invoker;
@@ -30,12 +31,15 @@ import org.apache.dubbo.rpc.model.ScopeModelUtil;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.triple.TripleWrapper;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -149,13 +153,13 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
ClassLoadUtil.switchContextLoader(getProviderModel().getServiceInterfaceClass().getClassLoader());
}
if (getMethodDescriptor() == null || getMethodDescriptor().isNeedWrap()) {
- final TripleWrapper.TripleRequestWrapper wrapper = TripleUtil.unpack(data,
+ final TripleWrapper.TripleRequestWrapper wrapper = unpack(data,
TripleWrapper.TripleRequestWrapper.class);
- if (!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()))) {
+ if (!getSerializeType().equals(convertHessianFromWrapper(wrapper.getSerializeType()))) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INVALID_ARGUMENT)
.withDescription("Received inconsistent serialization type from client, " +
"reject to deserialize! Expected:" + getSerializeType() +
- " Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
+ " Actual:" + convertHessianFromWrapper(wrapper.getSerializeType())));
return null;
}
if (getMethodDescriptor() == null) {
@@ -175,9 +179,9 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
return null;
}
}
- return TripleUtil.unwrapReq(getUrl(), wrapper, getMultipleSerialization());
+ return unwrapReq(getUrl(), wrapper, getMultipleSerialization());
} else {
- return new Object[]{TripleUtil.unpack(data, getMethodDescriptor().getParameterClasses()[0])};
+ return new Object[]{unpack(data, getMethodDescriptor().getParameterClasses()[0])};
}
} catch (Throwable throwable) {
LOGGER.warn("Decode request failed:", throwable);
@@ -189,6 +193,23 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
}
}
+ private Object[] unwrapReq(URL url, TripleWrapper.TripleRequestWrapper wrap,
+ MultipleSerialization multipleSerialization) {
+ String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
+ try {
+ Object[] arguments = new Object[wrap.getArgsCount()];
+ for (int i = 0; i < arguments.length; i++) {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getArgs(i).toByteArray());
+ Object obj = multipleSerialization.deserialize(url,
+ serializeType, wrap.getArgTypes(i), bais);
+ arguments[i] = obj;
+ }
+ return arguments;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to unwrap req: " + e.getMessage(), e);
+ }
+ }
+
/**
* create basic meta data
*/
@@ -209,12 +230,12 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
}
final Message message;
if (getMethodDescriptor().isNeedWrap()) {
- message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
+ message = wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
getMultipleSerialization());
} else {
message = (Message) value;
}
- byte[] out = TripleUtil.pack(message);
+ byte[] out = pack(message);
return super.compress(out);
} catch (Throwable throwable) {
LOGGER.error("Encode Response data error ", throwable);
@@ -263,13 +284,33 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
}
@Override
- protected void cancelByRemoteReset(Http2Error http2Error) {
+ protected void cancelByRemoteReset() {
getCancellationContext().cancel(null);
}
@Override
protected void cancelByLocal(Throwable throwable) {
- asTransportObserver().onReset(Http2Error.CANCEL);
+ inboundTransportObserver()
+ .onError(GrpcStatus.fromCode(GrpcStatus.Code.CANCELLED)
+ .withCause(throwable));
}
+
+ public TripleWrapper.TripleResponseWrapper wrapResp(URL url, String serializeType, Object resp,
+ MethodDescriptor desc,
+ MultipleSerialization multipleSerialization) {
+ try {
+ final TripleWrapper.TripleResponseWrapper.Builder builder = TripleWrapper.TripleResponseWrapper.newBuilder()
+ .setType(desc.getReturnClass().getName())
+ .setSerializeType(convertHessianToWrapper(serializeType));
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ multipleSerialization.serialize(url, serializeType, desc.getReturnClass().getName(), resp, bos);
+ builder.setData(ByteString.copyFrom(bos.toByteArray()));
+ bos.close();
+ return builder.build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to pack wrapper req", e);
+ }
+ }
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index aaac6d8..1220ec7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -26,28 +26,37 @@ import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.Constants;
import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.Status;
-import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
+/**
+ * AbstractStream provides more detailed actions for streaming process.
+ */
public abstract class AbstractStream implements Stream {
- protected static final String DUPLICATED_DATA = "Duplicated data";
+
+ private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
+ private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder().withoutPadding();
private final URL url;
private final MultipleSerialization multipleSerialization;
- private final StreamObserver<Object> streamObserver;
- private final TransportObserver transportObserver;
+ private final StreamObserver<Object> inboundMessageObserver;
+ private final InboundTransportObserver inboundTransportObserver;
private final Executor executor;
private final CancellationContext cancellationContext;
// AcceptEncoding does not change after the application is started,
@@ -57,8 +66,8 @@ public abstract class AbstractStream implements Stream {
private MethodDescriptor methodDescriptor;
private String methodName;
private String serializeType;
- private StreamObserver<Object> streamSubscriber;
- private AbstractChannelTransportObserver transportSubscriber;
+ private StreamObserver<Object> outboundMessageSubscriber;
+ private OutboundTransportObserver outboundTransportObserver;
private Compressor compressor = IdentityCompressor.NONE;
private Compressor deCompressor = IdentityCompressor.NONE;
private volatile boolean cancelled = false;
@@ -70,16 +79,53 @@ public abstract class AbstractStream implements Stream {
protected AbstractStream(URL url, Executor executor) {
this.url = url;
final Executor sourceExecutor = lookupExecutor(url, executor);
+ // wrap executor to ensure linear stream message processing
this.executor = wrapperSerializingExecutor(sourceExecutor);
final String value = url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY);
- this.multipleSerialization = url.getOrDefaultFrameworkModel().getExtensionLoader(MultipleSerialization.class)
+ this.multipleSerialization = url.getOrDefaultFrameworkModel()
+ .getExtensionLoader(MultipleSerialization.class)
.getExtension(value);
this.cancellationContext = new CancellationContext();
- this.transportObserver = createTransportObserver();
- this.streamObserver = createStreamObserver();
+ // A stream implementation must know how to process inbound transport message
+ this.inboundTransportObserver = createInboundTransportObserver();
+ // A stream implementation must know how to process inbound App level message
+ this.inboundMessageObserver = createStreamObserver();
this.acceptEncoding = Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel());
}
+
+ /**
+ * Cancel by remote by receiving reset frame
+ */
+ protected abstract void cancelByRemoteReset();
+
+ /**
+ * Cancel by local by some error
+ *
+ * @param throwable the cancel cause
+ */
+ protected abstract void cancelByLocal(Throwable throwable);
+
+ /**
+ * create request StreamObserver
+ */
+ protected abstract StreamObserver<Object> createStreamObserver();
+
+ /**
+ * create response TransportObserver
+ */
+ protected abstract InboundTransportObserver createInboundTransportObserver();
+
+ private void closeQuietly(Closeable c) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (IOException ignore) {
+ // ignored
+ }
+ }
+ }
+
private Executor lookupExecutor(URL url, Executor executor) {
// only server maybe not null
if (executor != null) {
@@ -104,7 +150,7 @@ public abstract class AbstractStream implements Stream {
}
public TransportState getState() {
- return transportSubscriber.state;
+ return outboundTransportObserver.state;
}
public boolean isCancelled() {
@@ -147,32 +193,16 @@ public abstract class AbstractStream implements Stream {
private void cancel() {
cancelled = true;
- execute(RpcContext::removeCancellationContext);
}
/**
* remote cancel
- *
- * @param http2Error {@link Http2Error}
*/
- protected final void cancelByRemote(Http2Error http2Error) {
+ protected final void cancelByRemote() {
cancel();
- cancelByRemoteReset(http2Error);
+ cancelByRemoteReset();
}
- protected abstract void cancelByRemoteReset(Http2Error http2Error);
-
- protected abstract void cancelByLocal(Throwable throwable);
-
- /**
- * create request StreamObserver
- */
- protected abstract StreamObserver<Object> createStreamObserver();
-
- /**
- * create response TransportObserver
- */
- protected abstract TransportObserver createTransportObserver();
public String getSerializeType() {
return serializeType;
@@ -190,12 +220,12 @@ public abstract class AbstractStream implements Stream {
return multipleSerialization;
}
- public StreamObserver<Object> getStreamSubscriber() {
- return streamSubscriber;
+ public StreamObserver<Object> outboundMessageSubscriber() {
+ return outboundMessageSubscriber;
}
- public TransportObserver getTransportSubscriber() {
- return transportSubscriber;
+ public OutboundTransportObserver outboundTransportObserver() {
+ return outboundTransportObserver;
}
public MethodDescriptor getMethodDescriptor() {
@@ -250,23 +280,22 @@ public abstract class AbstractStream implements Stream {
}
@Override
- public void subscribe(StreamObserver<Object> observer) {
- this.streamSubscriber = observer;
+ public void subscribe(StreamObserver<Object> outboundMessageObserver) {
+ this.outboundMessageSubscriber = outboundMessageObserver;
}
@Override
- public void subscribe(AbstractChannelTransportObserver observer) {
- this.transportSubscriber = observer;
+ public void subscribe(OutboundTransportObserver observer) {
+ this.outboundTransportObserver = observer;
}
- @Override
- public StreamObserver<Object> asStreamObserver() {
- return streamObserver;
+ public StreamObserver<Object> inboundMessageObserver() {
+ return inboundMessageObserver;
}
@Override
- public TransportObserver asTransportObserver() {
- return transportObserver;
+ public TransportObserver inboundTransportObserver() {
+ return inboundTransportObserver;
}
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
@@ -274,31 +303,22 @@ public abstract class AbstractStream implements Stream {
if (!onlyTrailers) {
// set metadata
Metadata metadata = new DefaultMetadata();
- getTransportSubscriber().onMetadata(metadata, false);
+ outboundTransportObserver().onMetadata(metadata, false);
}
// set trailers
Metadata trailers = getTrailers(status);
if (attachments != null) {
convertAttachment(trailers, attachments);
}
- getTransportSubscriber().onMetadata(trailers, true);
+ outboundTransportObserver().onMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Error] status=" + status.code.code
+ " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
}
}
- protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
- transportError(status, attachments, false);
- }
-
protected void transportError(GrpcStatus status) {
- transportError(status, null);
- }
-
- protected void transportError(Throwable throwable) {
- GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable, throwable.getMessage());
- transportError(status, null);
+ transportError(status, null, false);
}
private String getGrpcMessage(GrpcStatus status) {
@@ -322,7 +342,7 @@ public abstract class AbstractStream implements Stream {
if (throwable == null) {
Status status = builder.build();
metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
- TripleUtil.encodeBase64ASCII(status.toByteArray()));
+ encodeBase64ASCII(status.toByteArray()));
return metadata;
}
DebugInfo debugInfo = DebugInfo.newBuilder()
@@ -333,10 +353,16 @@ public abstract class AbstractStream implements Stream {
builder.addDetails(Any.pack(debugInfo));
Status status = builder.build();
metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
- TripleUtil.encodeBase64ASCII(status.toByteArray()));
+ encodeBase64ASCII(status.toByteArray()));
return metadata;
}
+ /**
+ * Parse metadata to a KV pairs map.
+ *
+ * @param metadata the metadata from remote
+ * @return KV pairs map
+ */
protected Map<String, Object> parseMetadataToAttachmentMap(Metadata metadata) {
Map<String, Object> attachments = new HashMap<>();
for (Map.Entry<CharSequence, CharSequence> header : metadata) {
@@ -350,7 +376,7 @@ public abstract class AbstractStream implements Stream {
}
if (key.endsWith(TripleConstant.GRPC_BIN_SUFFIX) && key.length() > 4) {
try {
- attachments.put(key.substring(0, key.length() - 4), TripleUtil.decodeASCIIByte(header.getValue()));
+ attachments.put(key.substring(0, key.length() - 4), decodeASCIIByte(header.getValue()));
} catch (Exception e) {
LOGGER.error("Failed to parse response attachment key=" + key, e);
}
@@ -361,6 +387,13 @@ public abstract class AbstractStream implements Stream {
return attachments;
}
+ /**
+ * Parse and put the KV pairs into metadata. Ignore Http2 PseudoHeaderName and internal name.
+ * Only raw byte array or string value will be put.
+ *
+ * @param metadata the metadata holder
+ * @param attachments KV pairs
+ */
protected void convertAttachment(Metadata metadata, Map<String, Object> attachments) {
if (attachments == null) {
return;
@@ -391,7 +424,7 @@ public abstract class AbstractStream implements Stream {
String str = (String) v;
metadata.put(key, str);
} else if (v instanceof byte[]) {
- String str = TripleUtil.encodeBase64ASCII((byte[]) v);
+ String str = encodeBase64ASCII((byte[]) v);
metadata.put(key + TripleConstant.GRPC_BIN_SUFFIX, str);
}
} catch (Throwable t) {
@@ -399,97 +432,73 @@ public abstract class AbstractStream implements Stream {
}
}
- protected byte[] compress(byte[] data) {
- return this.getCompressor().compress(data);
+ protected String convertHessianFromWrapper(String serializeType) {
+ if (TripleConstant.HESSIAN4.equals(serializeType)) {
+ return TripleConstant.HESSIAN2;
+ }
+ return serializeType;
}
- protected byte[] decompress(byte[] data) {
- return this.getDeCompressor().decompress(data);
+ protected <T> T unpack(byte[] data, Class<T> clz) {
+ return unpack(new ByteArrayInputStream(data), clz);
}
- protected abstract class AbstractTransportObserver implements TransportObserver {
- private Metadata headers;
- private Metadata trailers;
-
- public Metadata getHeaders() {
- return headers;
- }
-
- public Metadata getTrailers() {
- return trailers;
+ protected <T> T unpack(InputStream is, Class<T> clz) {
+ try {
+ final T req = SingleProtobufUtils.deserialize(is, clz);
+ is.close();
+ return req;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to unpack req", e);
+ } finally {
+ closeQuietly(is);
}
+ }
- @Override
- public void onReset(Http2Error http2Error) {
- if (getState().allowSendReset()) {
- getState().setResetSend();
- getTransportSubscriber().onReset(http2Error);
- }
+ protected byte[] pack(Object obj) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ SingleProtobufUtils.serialize(obj, baos);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to pack protobuf object", e);
}
+ return baos.toByteArray();
+ }
- @Override
- public void onMetadata(Metadata metadata, boolean endStream) {
- if (headers == null) {
- headers = metadata;
- } else {
- trailers = metadata;
- }
- }
- protected GrpcStatus extractStatusFromMeta(Metadata metadata) {
- if (metadata.contains(TripleHeaderEnum.STATUS_KEY.getHeader())) {
- final int code = Integer.parseInt(metadata.get(TripleHeaderEnum.STATUS_KEY.getHeader()).toString());
-
- if (!GrpcStatus.Code.isOk(code)) {
- GrpcStatus status = GrpcStatus.fromCode(code);
- if (metadata.contains(TripleHeaderEnum.MESSAGE_KEY.getHeader())) {
- final String raw = metadata.get(TripleHeaderEnum.MESSAGE_KEY.getHeader()).toString();
- status = status.withDescription(GrpcStatus.fromMessage(raw));
- }
- return status;
- }
- return GrpcStatus.fromCode(Code.OK);
- }
- return GrpcStatus.fromCode(Code.OK);
- }
+ protected String encodeBase64ASCII(byte[] in) {
+ byte[] bytes = encodeBase64(in);
+ return new String(bytes, StandardCharsets.US_ASCII);
+ }
+ protected byte[] encodeBase64(byte[] in) {
+ return BASE64_ENCODER.encode(in);
}
- protected abstract class UnaryTransportObserver extends AbstractTransportObserver implements TransportObserver {
- private byte[] data;
+ protected byte[] decodeASCIIByte(CharSequence value) {
+ return BASE64_DECODER.decode(value.toString().getBytes(StandardCharsets.US_ASCII));
+ }
- public byte[] getData() {
- return data;
+ /**
+ * Convert hessian version from Dubbo's SPI version(hessian2) to wrapper API version (hessian4)
+ *
+ * @param serializeType literal type
+ * @return hessian4 if the param is hessian2, otherwise return the param
+ */
+ protected String convertHessianToWrapper(String serializeType) {
+ if (TripleConstant.HESSIAN2.equals(serializeType)) {
+ return TripleConstant.HESSIAN4;
}
+ return serializeType;
+ }
- protected abstract void onError(GrpcStatus status);
-
- @Override
- public void onComplete() {
- execute(() -> {
- final GrpcStatus status = extractStatusFromMeta(getHeaders());
- if (Code.isOk(status.code.code)) {
- doOnComplete();
- } else {
- onError(status);
- }
- });
- }
+ protected byte[] compress(byte[] data) {
+ return this.getCompressor().compress(data);
+ }
- /**
- * This method exception needs to be caught by the implementation class
- */
- protected abstract void doOnComplete();
+ protected byte[] decompress(byte[] data) {
+ return this.getDeCompressor().decompress(data);
+ }
- @Override
- public void onData(byte[] in, boolean endStream) {
- if (data == null) {
- this.data = in;
- } else {
- onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription(DUPLICATED_DATA));
- }
- }
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
similarity index 86%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
index 1a53881..c939ae5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
@@ -17,9 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
@@ -30,13 +27,16 @@ import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
-public class ClientTransportObserver extends AbstractChannelTransportObserver {
+/**
+ * Send stream data to remote
+ * {@link ClientOutboundTransportObserver#promise} will be set success after rst or complete sent,
+ */
+public class ClientOutboundTransportObserver extends OutboundTransportObserver {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClientTransportObserver.class);
private final ChannelPromise promise;
private final Http2StreamChannel streamChannel;
- public ClientTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
+ public ClientOutboundTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
this.streamChannel = channel;
this.promise = promise;
}
@@ -68,8 +68,8 @@ public class ClientTransportObserver extends AbstractChannelTransportObserver {
}
@Override
- protected void doOnReset(Http2Error http2Error) {
- streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+ protected void doOnError(GrpcStatus status) {
+ streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
.addListener(future -> {
if (future.isSuccess()) {
promise.trySuccess();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index b868d87..a41b739 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -31,8 +31,8 @@ public class ClientStream extends AbstractClientStream implements Stream {
}
@Override
- protected TransportObserver createTransportObserver() {
- return new ClientTransportObserverImpl();
+ protected InboundTransportObserver createInboundTransportObserver() {
+ return new ClientStreamInboundTransportObserverImpl();
}
@Override
@@ -47,8 +47,8 @@ public class ClientStream extends AbstractClientStream implements Stream {
StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[1];
obServer = attachCancelContext(obServer, getCancellationContext());
subscribe(obServer);
- asStreamObserver().onNext(getRpcInvocation().getArguments()[0]);
- asStreamObserver().onCompleted();
+ inboundMessageObserver().onNext(getRpcInvocation().getArguments()[0]);
+ inboundMessageObserver().onCompleted();
return new AppResponse();
}
@@ -56,7 +56,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[0];
obServer = attachCancelContext(obServer, getCancellationContext());
subscribe(obServer);
- return new AppResponse(asStreamObserver());
+ return new AppResponse(inboundMessageObserver());
}
private <T> StreamObserver<T> attachCancelContext(StreamObserver<T> observer, CancellationContext context) {
@@ -68,7 +68,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
return observer;
}
- private class ClientTransportObserverImpl extends AbstractTransportObserver {
+ private class ClientStreamInboundTransportObserverImpl extends InboundTransportObserver {
private boolean error = false;
@@ -77,7 +77,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
execute(() -> {
try {
final Object resp = deserializeResponse(data);
- getStreamSubscriber().onNext(resp);
+ outboundMessageSubscriber().onNext(resp);
} catch (Throwable throwable) {
onError(throwable);
}
@@ -85,12 +85,17 @@ public class ClientStream extends AbstractClientStream implements Stream {
}
@Override
+ public void onError(GrpcStatus status) {
+ onError(status.asException());
+ }
+
+ @Override
public void onComplete() {
execute(() -> {
getState().setServerEndStreamReceived();
final GrpcStatus status = extractStatusFromMeta(getHeaders());
if (GrpcStatus.Code.isOk(status.code.code)) {
- getStreamSubscriber().onCompleted();
+ outboundMessageSubscriber().onCompleted();
} else {
onError(status.cause);
}
@@ -105,7 +110,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
if (!getState().serverSendStreamReceived()) {
cancel(throwable);
}
- getStreamSubscriber().onError(throwable);
+ outboundMessageSubscriber().onError(throwable);
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/InboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/InboundTransportObserver.java
new file mode 100644
index 0000000..43050f1
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/InboundTransportObserver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
+
+public abstract class InboundTransportObserver implements TransportObserver {
+ private Metadata headers;
+ private Metadata trailers;
+
+ public Metadata getHeaders() {
+ return headers;
+ }
+
+ public Metadata getTrailers() {
+ return trailers;
+ }
+
+ @Override
+ public void onMetadata(Metadata metadata, boolean endStream) {
+ if (headers == null) {
+ headers = metadata;
+ } else {
+ trailers = metadata;
+ }
+ }
+
+ protected GrpcStatus extractStatusFromMeta(Metadata metadata) {
+ if (!metadata.contains(TripleHeaderEnum.STATUS_KEY.getHeader())) {
+ return GrpcStatus.fromCode(Code.OK);
+ }
+ final int code = Integer.parseInt(metadata.get(TripleHeaderEnum.STATUS_KEY.getHeader()).toString());
+
+ if (Code.isOk(code)) {
+ return GrpcStatus.fromCode(Code.OK);
+ }
+ GrpcStatus status = GrpcStatus.fromCode(code);
+ if (!metadata.contains(TripleHeaderEnum.MESSAGE_KEY.getHeader())) {
+ return status;
+ }
+ final String raw = metadata.get(TripleHeaderEnum.MESSAGE_KEY.getHeader()).toString();
+ status = status.withDescription(GrpcStatus.fromMessage(raw));
+ return status;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
similarity index 61%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
index 47c24f3..4194b4f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
@@ -17,38 +17,63 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http2.Http2Error;
-
-public abstract class AbstractChannelTransportObserver implements TransportObserver {
+/**
+ * Provides loosely state management for write message to outbound.
+ */
+public abstract class OutboundTransportObserver implements TransportObserver {
protected final TransportState state = new TransportState();
@Override
public void onMetadata(Metadata metadata, boolean endStream) {
+ checkSendMeta(metadata, endStream);
+ doOnMetadata(metadata, endStream);
+ }
+
+ protected void checkSendMeta(Object metadata, boolean endStream) {
if (endStream) {
+ if (!state.allowSendEndStream()) {
+ throw new IllegalStateException("Metadata endStream already sent to peer, send " + metadata + " failed!");
+ }
state.setEndStreamSend();
} else {
+ if (!state.allowSendMeta()) {
+ throw new IllegalStateException("Metadata already sent to peer, send " + metadata + " failed!");
+ }
state.setMetaSend();
}
- doOnMetadata(metadata, endStream);
}
@Override
public void onData(byte[] data, boolean endStream) {
+ checkSendData(endStream);
+ doOnData(data, endStream);
+ }
+
+
+ protected void checkSendData(boolean endStream) {
+ if (!state.allowSendData()) {
+ throw new IllegalStateException("data has not sent to peer!");
+ }
if (endStream) {
state.setEndStreamSend();
}
- doOnData(data, endStream);
}
@Override
- public void onReset(Http2Error http2Error) {
+ public void onError(GrpcStatus status) {
+ if (!state.allowSendReset()) {
+ throw new IllegalStateException("Duplicated rst!");
+ }
state.setResetSend();
- doOnReset(http2Error);
+ doOnError(status);
}
@Override
public void onComplete() {
+ if (!state.allowSendEndStream()) {
+ throw new IllegalStateException("Stream already closed!");
+ }
state.setEndStreamSend();
doOnComplete();
}
@@ -58,7 +83,7 @@ public abstract class AbstractChannelTransportObserver implements TransportObser
protected abstract void doOnData(byte[] data, boolean endStream);
- protected abstract void doOnReset(Http2Error http2Error);
+ protected abstract void doOnError(GrpcStatus status);
protected abstract void doOnComplete();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
similarity index 88%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
index 0e09ad4..4f74ec9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
@@ -30,22 +30,18 @@ import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
-public class ServerTransportObserver extends AbstractChannelTransportObserver {
+public class ServerOutboundTransportObserver extends OutboundTransportObserver {
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerTransportObserver.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerOutboundTransportObserver.class);
private final ChannelHandlerContext ctx;
- public ServerTransportObserver(ChannelHandlerContext ctx) {
+ public ServerOutboundTransportObserver(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void onMetadata(Http2Headers headers, boolean endStream) {
- if (endStream) {
- state.setEndStreamSend();
- } else {
- state.setMetaSend();
- }
+ checkSendMeta(headers, endStream);
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
.addListener(future -> {
if (!future.isSuccess()) {
@@ -81,8 +77,8 @@ public class ServerTransportObserver extends AbstractChannelTransportObserver {
}
@Override
- protected void doOnReset(Http2Error http2Error) {
- ctx.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+ protected void doOnError(GrpcStatus status) {
+ ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
.addListener(future -> {
if (!future.isSuccess()) {
LOGGER.warn("write reset error", future.cause());
@@ -101,9 +97,7 @@ public class ServerTransportObserver extends AbstractChannelTransportObserver {
}
public void onData(ByteBuf buf, boolean endStream) {
- if (endStream) {
- state.setEndStreamSend();
- }
+ checkSendData(endStream);
ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
.addListener(future -> {
if (!future.isSuccess()) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 86a811d..5623b33 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -35,8 +35,8 @@ public class ServerStream extends AbstractServerStream implements Stream {
}
@Override
- protected TransportObserver createTransportObserver() {
- return new StreamTransportObserver();
+ protected InboundTransportObserver createInboundTransportObserver() {
+ return new ServerStreamInboundTransportObserver();
}
private class ServerStreamObserverImpl implements ServerStreamObserver<Object> {
@@ -44,14 +44,14 @@ public class ServerStream extends AbstractServerStream implements Stream {
@Override
public void onNext(Object data) {
if (getState().allowSendMeta()) {
- getTransportSubscriber().onMetadata(createResponseMeta(), false);
+ outboundTransportObserver().onMetadata(createResponseMeta(), false);
}
final byte[] bytes = encodeResponse(data);
if (bytes == null) {
return;
}
if (getState().allowSendData()) {
- getTransportSubscriber().onData(bytes, false);
+ outboundTransportObserver().onData(bytes, false);
}
}
@@ -71,7 +71,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
if (!getState().allowSendEndStream()) {
return;
}
- getTransportSubscriber().onMetadata(TripleConstant.SUCCESS_RESPONSE_META, true);
+ outboundTransportObserver().onMetadata(TripleConstant.SUCCESS_RESPONSE_META, true);
}
@Override
@@ -87,7 +87,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
}
}
- private class StreamTransportObserver extends AbstractTransportObserver implements TransportObserver {
+ private class ServerStreamInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
/**
* for server stream the method only save header
@@ -114,7 +114,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
try {
RpcContext.restoreCancellationContext(getCancellationContext());
final RpcInvocation inv = buildInvocation(metadata);
- inv.setArguments(new Object[]{asStreamObserver()});
+ inv.setArguments(new Object[]{inboundMessageObserver()});
final Result result = getInvoker().invoke(inv);
if (result.hasException()) {
transportError(GrpcStatus.getStatus(result.getException()));
@@ -137,7 +137,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
public void onData(byte[] in, boolean endStream) {
execute(() -> {
try {
- if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
+ if (getMethodDescriptor().isServerStream()) {
serverStreamOnData(in);
return;
}
@@ -151,12 +151,19 @@ public class ServerStream extends AbstractServerStream implements Stream {
}
/**
+ * This method should not be called for a while
+ */
+ @Override
+ public void onError(GrpcStatus status) {
+ }
+
+ /**
* call observer onNext
*/
private void biStreamOnData(byte[] in) {
final Object[] arguments = deserializeRequest(in);
if (arguments != null) {
- getStreamSubscriber().onNext(arguments[0]);
+ outboundMessageSubscriber().onNext(arguments[0]);
}
}
@@ -177,7 +184,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
RpcInvocation inv = buildInvocation(getHeaders());
final Object[] arguments = deserializeRequest(in);
if (arguments != null) {
- inv.setArguments(new Object[]{arguments[0], asStreamObserver()});
+ inv.setArguments(new Object[]{arguments[0], inboundMessageObserver()});
final Result result = getInvoker().invoke(inv);
if (result.hasException()) {
transportError(GrpcStatus.getStatus(result.getException()));
@@ -195,12 +202,10 @@ public class ServerStream extends AbstractServerStream implements Stream {
*/
@Override
public void onComplete() {
- if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
+ if (getMethodDescriptor().isServerStream()) {
return;
}
- execute(() -> {
- getStreamSubscriber().onCompleted();
- });
+ execute(() -> outboundMessageSubscriber().onCompleted());
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
index 358cc33..890c299 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
@@ -21,10 +21,10 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
/**
- * Stream acts as a bi-directional intermediate layer for streaming data processing. It serializes object instance to
- * byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #asTransportObserver()}
- * and {@link #subscribe(AbstractChannelTransportObserver)} provide {@link TransportObserver} to send or receive remote data.
- * {@link #asStreamObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
+ * Stream acts as a bi-directional intermediate layer for processing streaming data . It serializes object instance to
+ * byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #inboundTransportObserver()}
+ * and {@link #subscribe(OutboundTransportObserver)} provide {@link TransportObserver} to receive or send remote data.
+ * {@link #inboundMessageObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
* as API for users fetching/emitting objects from/to remote peer.
*/
public interface Stream {
@@ -36,28 +36,28 @@ public interface Stream {
*
* @param observer receives remote byte[] data
*/
- void subscribe(AbstractChannelTransportObserver observer);
+ void subscribe(OutboundTransportObserver observer);
/**
* Get a downstream data observer for writing byte[] data to this stream
*
* @return an observer for writing byte[] to remote peer
*/
- TransportObserver asTransportObserver();
+ TransportObserver inboundTransportObserver();
/**
- * Register an upstream data observer to receive byte[] sent by this stream
+ * Register an upstream data observer to receive instance sent by this stream
*
- * @param observer receives remote byte[] data
+ * @param outboundMessageObserver receives remote byte[] data
*/
- void subscribe(StreamObserver<Object> observer);
+ void subscribe(StreamObserver<Object> outboundMessageObserver);
/**
* Get a downstream data observer for transmitting instances to application code
*
* @return an observer for writing byte[] to remote peer
*/
- StreamObserver<Object> asStreamObserver();
+ StreamObserver<Object> inboundMessageObserver();
/**
* Execute a task in stream's executor
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 7e5bc8f..5beebb7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -17,16 +17,40 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http2.Http2Error;
-
+/**
+ * An observer used for transport messaging which provides full streaming support.
+ * A TransportObserver receives raw data or control messages from local/remote.
+ * Implementations should prefer to extend {@link OutboundTransportObserver} and {@link InboundTransportObserver}
+ * instead of this interface.
+ */
public interface TransportObserver {
+ /**
+ * Transport metadata
+ *
+ * @param metadata metadata KV paris
+ * @param endStream whether this data should terminate the stream
+ */
void onMetadata(Metadata metadata, boolean endStream);
+ /**
+ * Transport data
+ *
+ * @param data raw byte array
+ * @param endStream whether this data should terminate the stream
+ */
void onData(byte[] data, boolean endStream);
- void onReset(Http2Error http2Error);
+ /**
+ * Error
+ *
+ * @param status error status
+ */
+ void onError(GrpcStatus status);
+ /**
+ * Set stream completed
+ */
void onComplete();
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
index 4160fd6..4f922d3 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
@@ -17,25 +17,21 @@
package org.apache.dubbo.rpc.protocol.tri;
+/**
+ * A state for recording stream
+ * A normal state transition :
+ * Meta -> (EndStream) -> Data -> (EndStream) -> (Rst)
+ */
public class TransportState {
- private volatile int state = 0;
private static final int META_SEND = 0b00000000000000000000000000000001;
private static final int RESET_SEND = 0b00000000000000000000000000000010;
private static final int END_STREAM_SEND = 0b00000000000000000000000000000100;
private static final int SERVER_SEND_STREAM_RECEIVED = 0b00000000000000000000000000001000;
-
private static final int ALLOW_META_SEND = 0b00000000000000000000000000000000;
private static final int ALLOW_DATA_SEND = META_SEND;
private static final int ALLOW_END_STREAM_SEND = META_SEND;
- private static final int ALLOW_RESET_SEND = 0b00000000000000000000000000000001;
-
- public TransportState() {
- }
-
- public void setState(int state) {
- this.state = state;
- }
+ private volatile int state = 0;
public void setMetaSend() {
this.state = this.state | META_SEND;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index 377b093..007f235 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -26,7 +26,7 @@ public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
final byte[] data = (byte[]) msg;
if (clientStream != null) {
- clientStream.asTransportObserver()
+ clientStream.inboundTransportObserver()
.onData(data, false);
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index f215c27..ba2da9a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -45,7 +45,6 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
}
private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
- DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
Connection connection = Connection.getConnectionFromChannel(ctx.channel());
final AbstractClientStream stream = AbstractClientStream.newClientStream(req, connection);
final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index 0a35d2b..eb14bee 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
@@ -32,7 +31,7 @@ import io.netty.handler.codec.http2.Http2StreamFrame;
import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
public final class TripleHttp2ClientResponseHandler extends SimpleChannelInboundHandler<Http2StreamFrame> {
- private static final Logger logger = LoggerFactory.getLogger(TripleHttp2ClientResponseHandler.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TripleHttp2ClientResponseHandler.class);
public TripleHttp2ClientResponseHandler() {
super(false);
@@ -44,7 +43,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
if (evt instanceof Http2GoAwayFrame) {
Http2GoAwayFrame event = (Http2GoAwayFrame) evt;
ctx.close();
- logger.debug(
+ LOGGER.debug(
"Event triggered, event name is: " + event.name() + ", last stream id is: " + event.lastStreamId());
} else if (evt instanceof Http2ResetFrame) {
onResetRead(ctx, (Http2ResetFrame) evt);
@@ -64,7 +63,8 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame) {
final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
- clientStream.cancelByRemote(Http2Error.valueOf(resetFrame.errorCode()));
+ LOGGER.warn("Triple Client received remote reset errorCode=" + resetFrame.errorCode());
+ clientStream.cancelByRemote();
ctx.close();
}
@@ -85,7 +85,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
}
}
}
- final TransportObserver observer = clientStream.asTransportObserver();
+ final TransportObserver observer = clientStream.inboundTransportObserver();
observer.onMetadata(new Http2HeaderMeta(headers), false);
if (msg.isEndStream()) {
observer.onComplete();
@@ -100,8 +100,8 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
Metadata metadata = new DefaultMetadata();
metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(status.code.code));
metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
- logger.warn("Meet Exception on ClientResponseHandler, status code is: " + status.code, cause);
- clientStream.asStreamObserver().onError(status.asException());
+ LOGGER.warn("Meet Exception on ClientResponseHandler, status code is: " + status.code, cause);
+ clientStream.inboundMessageObserver().onError(status.asException());
ctx.close();
}
@@ -111,7 +111,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
// stream already closed;
if (clientStream != null) {
- clientStream.asTransportObserver().onComplete();
+ clientStream.inboundTransportObserver().onComplete();
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 46207bb..8968d0d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -38,12 +38,11 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Frame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2ResetFrame;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
import java.util.List;
@@ -66,11 +65,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
onHeadersRead(ctx, (Http2HeadersFrame) msg);
} else if (msg instanceof Http2DataFrame) {
onDataRead(ctx, (Http2DataFrame) msg);
- } else if (msg instanceof Http2Frame) {
+ } else if (msg instanceof ReferenceCounted) {
// ignored
ReferenceCountUtil.release(msg);
- } else {
- super.channelRead(ctx, msg);
}
}
@@ -84,9 +81,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
}
public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
- Http2Error http2Error = Http2Error.valueOf(frame.errorCode());
final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
- serverStream.cancelByRemote(http2Error);
+ LOGGER.warn("Triple Server received remote reset errorCode=" + frame.errorCode());
+ serverStream.cancelByRemote();
ctx.close();
}
@@ -106,7 +103,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
if (msg.isEndStream()) {
final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
if (serverStream != null) {
- serverStream.asTransportObserver().onComplete();
+ serverStream.inboundTransportObserver().onComplete();
}
}
}
@@ -126,7 +123,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
final Http2Headers headers = msg.headers();
- ServerTransportObserver transportObserver = new ServerTransportObserver(ctx);
+ ServerOutboundTransportObserver transportObserver = new ServerOutboundTransportObserver(ctx);
if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
responsePlainTextError(transportObserver, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
@@ -243,7 +240,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
stream.methods(methodDescriptors);
}
- final TransportObserver observer = stream.asTransportObserver();
+ final TransportObserver observer = stream.inboundTransportObserver();
observer.onMetadata(new Http2HeaderMeta(headers), false);
if (msg.isEndStream()) {
observer.onComplete();
@@ -261,7 +258,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
}
- private void responsePlainTextError(ServerTransportObserver observer, int code, GrpcStatus status) {
+ private void responsePlainTextError(ServerOutboundTransportObserver observer, int code, GrpcStatus status) {
Http2Headers headers = new DefaultHttp2Headers(true)
.status(String.valueOf(code))
.setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
@@ -271,7 +268,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
observer.onData(status.description, true);
}
- private void responseErr(ServerTransportObserver observer, GrpcStatus status) {
+ private void responseErr(ServerOutboundTransportObserver observer, GrpcStatus status) {
Http2Headers trailers = new DefaultHttp2Headers()
.status(OK.codeAsText())
.set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 940a824..d2b124a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import grpc.health.v1.HealthCheckResponse;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -33,6 +32,8 @@ import org.apache.dubbo.rpc.protocol.AbstractExporter;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.protocol.tri.service.TriBuiltinService;
+import grpc.health.v1.HealthCheckResponse;
+
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
index a3156f3..a4d7787 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
@@ -26,10 +26,25 @@ import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2PingFrame;
import io.netty.util.ReferenceCountUtil;
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.HashSet;
+import java.util.Set;
+
import static org.apache.dubbo.rpc.protocol.tri.GracefulShutdown.GRACEFUL_SHUTDOWN_PING;
public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(TripleServerConnectionHandler.class);
+ // Some exceptions are not very useful and add too much noise to the log
+ private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
+ private static final Set<Class<?>> QUIET_EXCEPTIONS_CLASS = new HashSet<>();
+
+ static {
+ QUIET_EXCEPTIONS.add("NativeIoException");
+ QUIET_EXCEPTIONS_CLASS.add(IOException.class);
+ QUIET_EXCEPTIONS_CLASS.add(SocketException.class);
+ }
+
private GracefulShutdown gracefulShutdown;
@Override
@@ -50,6 +65,13 @@ public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
}
}
+ private boolean isQuiteException(Throwable t) {
+ if (QUIET_EXCEPTIONS_CLASS.contains(t.getClass())) {
+ return true;
+ }
+ return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
+ }
+
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
@@ -58,7 +80,7 @@ public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// this may be change in future follow https://github.com/apache/dubbo/pull/8644
- if (TripleUtil.isQuiteException(cause)) {
+ if (isQuiteException(cause)) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel:%s Error", ctx.channel()), cause);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 6728b09..7046bf2 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -25,7 +25,7 @@ public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
final byte[] data = (byte[]) msg;
if (serverStream != null) {
- serverStream.asTransportObserver()
+ serverStream.inboundTransportObserver()
.onData(data, false);
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
deleted file mode 100644
index 2fe6687..0000000
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.serialize.MultipleSerialization;
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.triple.TripleWrapper;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.ErrorInfo;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketException;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TripleUtil {
- // Some exceptions are not very useful and add too much noise to the log
- private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
- private static final Set<Class<?>> QUIET_EXCEPTIONS_CLASS = new HashSet<>();
- private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
- private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder().withoutPadding();
-
- static {
- QUIET_EXCEPTIONS.add("NativeIoException");
- QUIET_EXCEPTIONS_CLASS.add(IOException.class);
- QUIET_EXCEPTIONS_CLASS.add(SocketException.class);
- }
-
- public static boolean isQuiteException(Throwable t) {
- if (QUIET_EXCEPTIONS_CLASS.contains(t.getClass())) {
- return true;
- }
- return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
- }
-
- public static Object unwrapResp(URL url, TripleWrapper.TripleResponseWrapper wrap,
- MultipleSerialization serialization) {
- String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
- try {
- final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
- final Object ret = serialization.deserialize(url, serializeType, wrap.getType(), bais);
- bais.close();
- return ret;
- } catch (Exception e) {
- throw new RuntimeException("Failed to unwrap resp", e);
- }
- }
-
- public static Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
- Map<Class<?>, Object> map = new HashMap<>();
- try {
- for (Any any : detailList) {
- if (any.is(ErrorInfo.class)) {
- ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
- map.putIfAbsent(ErrorInfo.class, errorInfo);
- } else if (any.is(DebugInfo.class)) {
- DebugInfo debugInfo = any.unpack(DebugInfo.class);
- map.putIfAbsent(DebugInfo.class, debugInfo);
- }
- // support others type but now only support this
- }
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- return map;
- }
-
- public static Object[] unwrapReq(URL url, TripleWrapper.TripleRequestWrapper wrap,
- MultipleSerialization multipleSerialization) {
- String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
- try {
- Object[] arguments = new Object[wrap.getArgsCount()];
- for (int i = 0; i < arguments.length; i++) {
- final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getArgs(i).toByteArray());
- Object obj = multipleSerialization.deserialize(url,
- serializeType, wrap.getArgTypes(i), bais);
- arguments[i] = obj;
- }
- return arguments;
- } catch (Exception e) {
- throw new RuntimeException("Failed to unwrap req: " + e.getMessage(), e);
- }
- }
-
- public static TripleWrapper.TripleResponseWrapper wrapResp(URL url, String serializeType, Object resp,
- MethodDescriptor desc,
- MultipleSerialization multipleSerialization) {
- try {
- final TripleWrapper.TripleResponseWrapper.Builder builder = TripleWrapper.TripleResponseWrapper.newBuilder()
- .setType(desc.getReturnClass().getName())
- .setSerializeType(convertHessianToWrapper(serializeType));
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- multipleSerialization.serialize(url, serializeType, desc.getReturnClass().getName(), resp, bos);
- builder.setData(ByteString.copyFrom(bos.toByteArray()));
- bos.close();
- return builder.build();
- } catch (IOException e) {
- throw new RuntimeException("Failed to pack wrapper req", e);
- }
- }
-
-
- public static TripleWrapper.TripleRequestWrapper wrapReq(URL url, String serializeType, Object req,
- String type,
- MultipleSerialization multipleSerialization) {
- try {
- final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
- .addArgTypes(type)
- .setSerializeType(convertHessianToWrapper(serializeType));
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- multipleSerialization.serialize(url, serializeType, type, req, bos);
- builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
- bos.close();
- return builder.build();
- } catch (IOException e) {
- throw new RuntimeException("Failed to pack wrapper req", e);
- }
- }
-
- public static TripleWrapper.TripleRequestWrapper wrapReq(URL url, RpcInvocation invocation,
- MultipleSerialization serialization) {
- try {
- String serializationName = (String) invocation.getObjectAttachment(Constants.SERIALIZATION_KEY);
- final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
- .setSerializeType(convertHessianToWrapper(serializationName));
- for (int i = 0; i < invocation.getArguments().length; i++) {
- final String clz = invocation.getParameterTypes()[i].getName();
- builder.addArgTypes(clz);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- serialization.serialize(url, serializationName, clz, invocation.getArguments()[i], bos);
- builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
- }
- return builder.build();
- } catch (IOException e) {
- throw new RuntimeException("Failed to pack wrapper req", e);
- }
- }
-
- public static <T> T unpack(byte[] data, Class<T> clz) {
- return unpack(new ByteArrayInputStream(data), clz);
- }
-
- public static <T> T unpack(InputStream is, Class<T> clz) {
- try {
- final T req = SingleProtobufUtils.deserialize(is, clz);
- is.close();
- return req;
- } catch (IOException e) {
- throw new RuntimeException("Failed to unpack req", e);
- } finally {
- closeQuietly(is);
- }
- }
-
- private static void closeQuietly(Closeable c) {
- if (c != null) {
- try {
- c.close();
- } catch (IOException ignore) {
- // ignored
- }
- }
- }
-
- public static byte[] pack(Object obj) {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try {
- SingleProtobufUtils.serialize(obj, baos);
- } catch (IOException e) {
- throw new RuntimeException("Failed to pack protobuf object", e);
- }
- return baos.toByteArray();
- }
-
- public static String encodeWrapper(URL url, Object obj, String serializeType, MultipleSerialization serialization)
- throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- serialization.serialize(url, serializeType, obj.getClass().getName(), obj, bos);
- final TripleWrapper.TripleRequestWrapper wrap = TripleWrapper.TripleRequestWrapper.newBuilder()
- .setSerializeType(convertHessianToWrapper(serializeType))
- .addArgTypes(obj.getClass().getName())
- .addArgs(ByteString.copyFrom(bos.toByteArray()))
- .build();
- return encodeBase64ASCII(wrap.toByteArray());
- }
-
- public static String encodeBase64ASCII(byte[] in) {
- byte[] bytes = encodeBase64(in);
- return new String(bytes, StandardCharsets.US_ASCII);
- }
-
- public static byte[] encodeBase64(byte[] in) {
- return BASE64_ENCODER.encode(in);
- }
-
- public static Object decodeObjFromHeader(URL url, CharSequence value, MultipleSerialization serialization)
- throws InvalidProtocolBufferException {
- final byte[] decode = decodeASCIIByte(value);
- final TripleWrapper.TripleRequestWrapper wrapper = TripleWrapper.TripleRequestWrapper.parseFrom(decode);
- final Object[] objects = TripleUtil.unwrapReq(url, wrapper, serialization);
- return objects[0];
- }
-
- public static byte[] decodeASCIIByte(CharSequence value) {
- return BASE64_DECODER.decode(value.toString().getBytes(StandardCharsets.US_ASCII));
- }
-
- public static String convertHessianToWrapper(String serializeType) {
- if (TripleConstant.HESSIAN2.equals(serializeType)) {
- return TripleConstant.HESSIAN4;
- }
- return serializeType;
- }
-
- public static String convertHessianFromWrapper(String serializeType) {
- if (TripleConstant.HESSIAN4.equals(serializeType)) {
- return TripleConstant.HESSIAN2;
- }
- return serializeType;
- }
-
-}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index dc98859..774b98a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -24,9 +24,12 @@ import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcException;
import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.DebugInfo;
+import com.google.rpc.ErrorInfo;
import com.google.rpc.Status;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,41 +41,67 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
@Override
protected void doOnStartCall() {
- asStreamObserver().onNext(getRpcInvocation());
- asStreamObserver().onCompleted();
+ inboundMessageObserver().onNext(getRpcInvocation());
+ inboundMessageObserver().onCompleted();
}
@Override
- protected TransportObserver createTransportObserver() {
- return new UnaryClientTransportObserver();
+ protected InboundTransportObserver createInboundTransportObserver() {
+ return new ClientUnaryInboundTransportObserver();
}
- private class UnaryClientTransportObserver extends UnaryTransportObserver implements TransportObserver {
+ private Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
+ Map<Class<?>, Object> map = new HashMap<>();
+ try {
+ for (Any any : detailList) {
+ if (any.is(ErrorInfo.class)) {
+ ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
+ map.putIfAbsent(ErrorInfo.class, errorInfo);
+ } else if (any.is(DebugInfo.class)) {
+ DebugInfo debugInfo = any.unpack(DebugInfo.class);
+ map.putIfAbsent(DebugInfo.class, debugInfo);
+ }
+ // support others type but now only support this
+ }
+ } catch (InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ }
+ return map;
+ }
+
+ private class ClientUnaryInboundTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
@Override
- public void doOnComplete() {
- try {
- AppResponse result;
- if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
- final Object resp = deserializeResponse(getData());
- result = new AppResponse(resp);
+ public void onComplete() {
+ execute(() -> {
+ final GrpcStatus status = extractStatusFromMeta(getHeaders());
+ if (GrpcStatus.Code.isOk(status.code.code)) {
+ try {
+ AppResponse result;
+ if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
+ final Object resp = deserializeResponse(getData());
+ result = new AppResponse(resp);
+ } else {
+ result = new AppResponse();
+ }
+ Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+ result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
+ response.setResult(result);
+ DefaultFuture2.received(getConnection(), response);
+ } catch (Exception e) {
+ final GrpcStatus clientStatus = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withCause(e)
+ .withDescription("Failed to deserialize response");
+ onError(clientStatus);
+ }
} else {
- result = new AppResponse();
+ onError(status);
}
- Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
- result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
- response.setResult(result);
- DefaultFuture2.received(getConnection(), response);
- } catch (Exception e) {
- final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(e)
- .withDescription("Failed to deserialize response");
- onError(status);
- }
+ });
}
@Override
- protected void onError(GrpcStatus status) {
+ public void onError(GrpcStatus status) {
Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
response.setErrorMessage(status.description);
final AppResponse result = new AppResponse();
@@ -96,12 +125,12 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
return null;
}
final CharSequence raw = metadata.get(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader());
- byte[] statusDetailBin = TripleUtil.decodeASCIIByte(raw);
+ byte[] statusDetailBin = decodeASCIIByte(raw);
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
- final Status statusDetail = TripleUtil.unpack(statusDetailBin, Status.class);
+ final Status statusDetail = unpack(statusDetailBin, Status.class);
List<Any> detailList = statusDetail.getDetailsList();
- Map<Class<?>, Object> classObjectMap = TripleUtil.tranFromStatusDetails(detailList);
+ Map<Class<?>, Object> classObjectMap = tranFromStatusDetails(detailList);
// get common exception from DebugInfo
DebugInfo debugInfo = (DebugInfo) classObjectMap.get(DebugInfo.class);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
similarity index 62%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
index 377b093..8e9ed66 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
@@ -14,20 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+abstract class UnaryInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
+ protected static final String DUPLICATED_DATA = "Duplicated data";
-public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
+ private byte[] data;
- final byte[] data = (byte[]) msg;
- if (clientStream != null) {
- clientStream.asTransportObserver()
- .onData(data, false);
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ public void onData(byte[] in, boolean endStream) {
+ if (data == null) {
+ this.data = in;
+ } else {
+ onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription(DUPLICATED_DATA));
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 8c3d98b..663ef48 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -41,18 +41,18 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
}
@Override
- protected TransportObserver createTransportObserver() {
+ protected InboundTransportObserver createInboundTransportObserver() {
return new UnaryServerTransportObserver();
}
- private class UnaryServerTransportObserver extends UnaryTransportObserver implements TransportObserver {
+ private class UnaryServerTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
@Override
- protected void onError(GrpcStatus status) {
+ public void onError(GrpcStatus status) {
transportError(status);
}
@Override
- public void doOnComplete() {
+ public void onComplete() {
if (getData() != null) {
invoke();
} else {
@@ -82,15 +82,15 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
return;
}
Metadata metadata = createResponseMeta();
- getTransportSubscriber().onMetadata(metadata, false);
+ outboundTransportObserver().onMetadata(metadata, false);
final byte[] data = encodeResponse(response.getValue());
if (data == null) {
return;
}
- getTransportSubscriber().onData(data, false);
+ outboundTransportObserver().onData(data, false);
Metadata trailers = TripleConstant.SUCCESS_RESPONSE_META;
convertAttachment(trailers, response.getObjectAttachments());
- getTransportSubscriber().onMetadata(trailers, true);
+ outboundTransportObserver().onMetadata(trailers, true);
});
RpcContext.removeContext();
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
index ce6d7be..78a0038 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
@@ -47,6 +47,12 @@ class TransportStateTest {
transportState = new TransportState();
Assertions.assertTrue(transportState.allowSendMeta());
+
+ transportState = new TransportState();
+ if (transportState.allowSendMeta()) {
+ transportState.setMetaSend();
+ }
+ Assertions.assertFalse(transportState.allowSendMeta());
}
@Test
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 0cddc3c..0485e15 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -36,7 +36,7 @@ class UnaryClientStreamTest {
public void testInit() {
URL url = new ServiceConfigURL("test", "1.2.3.4", 8080);
final UnaryClientStream stream = UnaryClientStream.unary(url);
- final StreamObserver<Object> observer = stream.asStreamObserver();
+ final StreamObserver<Object> observer = stream.inboundMessageObserver();
RpcInvocation inv = Mockito.mock(RpcInvocation.class);
when(inv.getModuleModel()).thenReturn(ApplicationModel.defaultModel().getDefaultModule());
// no invoker
@@ -44,7 +44,7 @@ class UnaryClientStreamTest {
// no subscriber
Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
- AbstractChannelTransportObserver transportObserver = Mockito.mock(AbstractChannelTransportObserver.class);
+ OutboundTransportObserver transportObserver = Mockito.mock(OutboundTransportObserver.class);
stream.subscribe(transportObserver);
// no method descriptor
Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));