You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/21 06:23:46 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] Refactor transport state
and other code (#9057)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new bf3548d [3.0-Triple] Refactor transport state and other code (#9057)
bf3548d is described below
commit bf3548d50162aff07b3dcc216174a806646f174b
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Oct 21 14:23:32 2021 +0800
[3.0-Triple] Refactor transport state and other code (#9057)
* refactor(tri): refactor transport state and other code
* fix rat
* remove unused code
* refactor stream call
* fix style
* refactor client stream obServer
* fix ut
* finish client refactor
* refactor client construct the stream
* fix ut
* fix comment
* avoid block
* avoid block
* fix ut
* remove response error in util
* Abstract client and server transport
* fix comment
* fix ut
* Optimize interface
* Optimize promise
* Fix ut
* remove unused code
* Start call only when the channel creation is successful
* start call switch user threads
Co-authored-by: guohao <gu...@gmail.com>
---
.../apache/dubbo/rpc/model/MethodDescriptor.java | 4 +
.../tri/AbstractChannelTransportObserver.java | 76 +++++++
.../rpc/protocol/tri/AbstractClientStream.java | 252 ++++++++++++++++++---
.../rpc/protocol/tri/AbstractServerStream.java | 60 +++--
.../dubbo/rpc/protocol/tri/AbstractStream.java | 95 ++++----
.../dubbo/rpc/protocol/tri/ClientStream.java | 112 ++++-----
.../rpc/protocol/tri/ClientTransportObserver.java | 90 ++------
.../apache/dubbo/rpc/protocol/tri/Compressor.java | 2 +
.../apache/dubbo/rpc/protocol/tri/GrpcStatus.java | 38 +++-
.../dubbo/rpc/protocol/tri/IdentityCompressor.java | 1 -
.../dubbo/rpc/protocol/tri/ServerStream.java | 35 ++-
.../rpc/protocol/tri/ServerTransportObserver.java | 80 ++++---
.../org/apache/dubbo/rpc/protocol/tri/Stream.java | 4 +-
.../dubbo/rpc/protocol/tri/TransportObserver.java | 13 +-
.../dubbo/rpc/protocol/tri/TransportState.java | 76 +++++++
.../protocol/tri/TripleClientRequestHandler.java | 115 ++--------
.../dubbo/rpc/protocol/tri/TripleConstant.java | 42 ++--
.../tri/TripleHttp2FrameServerHandler.java | 100 +++++---
.../apache/dubbo/rpc/protocol/tri/TripleUtil.java | 48 +---
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 93 +++-----
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 109 ++-------
.../dubbo/rpc/protocol/tri/TransportStateTest.java | 116 ++++++++++
.../rpc/protocol/tri/UnaryClientStreamTest.java | 5 +-
23 files changed, 954 insertions(+), 612 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index 91c9887..93148df 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -122,6 +122,10 @@ public class MethodDescriptor {
return rpcType.equals(RpcType.SERVER_STREAM) || rpcType.equals(RpcType.BIDIRECTIONAL_STREAM) || rpcType.equals(RpcType.CLIENT_STREAM);
}
+ public boolean isServerStream() {
+ return RpcType.SERVER_STREAM.equals(rpcType);
+ }
+
public boolean isUnary() {
return rpcType.equals(RpcType.UNARY);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
new file mode 100644
index 0000000..47c24f3
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import io.netty.handler.codec.http2.Http2Error;
+
+public abstract class AbstractChannelTransportObserver implements TransportObserver {
+
+ protected final TransportState state = new TransportState();
+
+ @Override
+ public void onMetadata(Metadata metadata, boolean endStream) {
+ if (endStream) {
+ state.setEndStreamSend();
+ } else {
+ state.setMetaSend();
+ }
+ doOnMetadata(metadata, endStream);
+ }
+
+ @Override
+ public void onData(byte[] data, boolean endStream) {
+ if (endStream) {
+ state.setEndStreamSend();
+ }
+ doOnData(data, endStream);
+ }
+
+ @Override
+ public void onReset(Http2Error http2Error) {
+ state.setResetSend();
+ doOnReset(http2Error);
+ }
+
+ @Override
+ public void onComplete() {
+ state.setEndStreamSend();
+ doOnComplete();
+ }
+
+
+ protected abstract void doOnMetadata(Metadata metadata, boolean endStream);
+
+ protected abstract void doOnData(byte[] data, boolean endStream);
+
+ protected abstract void doOnReset(Http2Error http2Error);
+
+ protected abstract void doOnComplete();
+
+
+ protected int calcCompressFlag(Compressor compressor) {
+ if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+ return 0;
+ }
+ return 1;
+ }
+
+}
+
+
+
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index b794edb..f7dab7e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -18,34 +18,62 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.Connection;
+import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ConsumerModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.ServiceModel;
import org.apache.dubbo.triple.TripleWrapper;
+import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.util.AsciiString;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
public abstract class AbstractClientStream extends AbstractStream implements Stream {
+
+ private final AsciiString scheme;
private ConsumerModel consumerModel;
private Connection connection;
+ private RpcInvocation rpcInvocation;
+ private long requestId;
protected AbstractClientStream(URL url) {
super(url);
+ this.scheme = getSchemeFromUrl(url);
+ // for client cancel,send rst frame to server
+ this.getCancellationContext().addListener(context -> {
+ Throwable throwable = this.getCancellationContext().getCancellationCause();
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn("Triple request to "
+ + getConsumerModel().getServiceName() + "#" + getMethodName() +
+ " was canceled by local exception ", throwable);
+ }
+ this.asTransportObserver().onReset(getHttp2Error(throwable));
+ });
}
- protected AbstractClientStream(URL url, Executor executor) {
- super(url, executor);
- }
public static UnaryClientStream unary(URL url) {
return new UnaryClientStream(url);
@@ -55,38 +83,114 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
return new ClientStream(url);
}
- public static AbstractClientStream newClientStream(URL url, boolean unary) {
- AbstractClientStream stream = unary ? unary(url) : stream(url);
- final CancellationContext cancellationContext = stream.getCancellationContext();
- // for client cancel,send rst frame to server
- cancellationContext.addListener(context -> {
- if (LOGGER.isWarnEnabled()) {
- Throwable throwable = cancellationContext.getCancellationCause();
- LOGGER.warn("Cancel by local throwable is ", throwable);
+ public static AbstractClientStream newClientStream(Request req, Connection connection) {
+ final RpcInvocation inv = (RpcInvocation) req.getData();
+ final URL url = inv.getInvoker().getUrl();
+ ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
+ MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel, inv);
+ ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
+ AbstractClientStream stream = methodDescriptor.isUnary() ? unary(url) : stream(url);
+ Compressor compressor = getCompressor(url, consumerModel);
+ stream.request(req)
+ .service(consumerModel)
+ .connection(connection)
+ .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+ .method(methodDescriptor)
+ .setCompressor(compressor);
+ return stream;
+ }
+
+ protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
+ execute(() -> {
+ channel.pipeline()
+ .addLast(new TripleHttp2ClientResponseHandler())
+ .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
+ .addLast(new TripleClientInboundHandler());
+ channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(this);
+ final ClientTransportObserver clientTransportObserver = new ClientTransportObserver(channel, promise);
+ subscribe(clientTransportObserver);
+ try {
+ doOnStartCall();
+ } catch (Throwable throwable) {
+ cancel(throwable);
+ DefaultFuture2.getFuture(getRequestId()).cancel();
}
- stream.asTransportObserver().onReset(Http2Error.CANCEL);
});
- return stream;
}
- public AbstractClientStream service(ConsumerModel model) {
- this.consumerModel = model;
- return this;
+ protected abstract void doOnStartCall();
+
+ @Override
+ protected StreamObserver<Object> createStreamObserver() {
+ return new ClientStreamObserverImpl(getCancellationContext());
}
- public ConsumerModel getConsumerModel() {
- return consumerModel;
+ protected class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+
+ public ClientStreamObserverImpl(CancellationContext cancellationContext) {
+ super(cancellationContext);
+ }
+
+ @Override
+ public void onNext(Object data) {
+ if (getState().allowSendMeta()) {
+ final Metadata metadata = createRequestMeta(getRpcInvocation());
+ getTransportSubscriber().onMetadata(metadata, false);
+ }
+ if (getState().allowSendData()) {
+ final byte[] bytes = encodeRequest(data);
+ getTransportSubscriber().onData(bytes, false);
+ }
+ }
+
+ /**
+ * Handle all exceptions in the request process, other procedures directly throw
+ * <p>
+ * other procedures is {@link ClientStreamObserver#onNext(Object)} and {@link ClientStreamObserver#onCompleted()}
+ */
+ @Override
+ public void onError(Throwable throwable) {
+ if (getState().allowSendEndStream()) {
+ GrpcStatus status = GrpcStatus.getStatus(throwable);
+ transportError(status, null, getState().allowSendMeta());
+ } else {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("Triple request to "
+ + getConsumerModel().getServiceName() + "#" + getMethodName() +
+ " was failed by exception ", throwable);
+ }
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ if (getState().allowSendEndStream()) {
+ getTransportSubscriber().onComplete();
+ }
+ }
+
+ @Override
+ public void setCompression(String compression) {
+ if (!getState().allowSendMeta()) {
+ cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+ return;
+ }
+ Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+ setCompressor(compressor);
+ }
}
- public AbstractClientStream connection(Connection connection) {
- this.connection = connection;
- return this;
+ @Override
+ protected void cancelByRemoteReset(Http2Error http2Error) {
+ DefaultFuture2.getFuture(getRequestId()).cancel();
}
- public Connection getConnection() {
- return connection;
+ @Override
+ protected void cancelByLocal(Throwable throwable) {
+ getCancellationContext().cancel(throwable);
}
+
@Override
public void execute(Runnable runnable) {
try {
@@ -104,6 +208,59 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
}
}
+ public AbstractClientStream service(ConsumerModel model) {
+ this.consumerModel = model;
+ return this;
+ }
+
+ public AbstractClientStream request(Request request) {
+ this.requestId = request.getId();
+ this.rpcInvocation = (RpcInvocation) request.getData();
+ return this;
+ }
+
+ protected RpcInvocation getRpcInvocation() {
+ return this.rpcInvocation;
+ }
+
+ public AsciiString getScheme() {
+ return scheme;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ private AsciiString getSchemeFromUrl(URL url) {
+ try {
+ Boolean ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY, Boolean.class);
+ if (ssl == null) {
+ return TripleConstant.HTTP_SCHEME;
+ }
+ return ssl ? TripleConstant.HTTPS_SCHEME : TripleConstant.HTTP_SCHEME;
+ } catch (Exception e) {
+ return TripleConstant.HTTP_SCHEME;
+ }
+ }
+
+ private Http2Error getHttp2Error(Throwable throwable) {
+ // todo Convert the exception to http2Error
+ return Http2Error.CANCEL;
+ }
+
+ public ConsumerModel getConsumerModel() {
+ return consumerModel;
+ }
+
+ public AbstractClientStream connection(Connection connection) {
+ this.connection = connection;
+ return this;
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
protected byte[] encodeRequest(Object value) {
final byte[] out;
final Object obj;
@@ -114,7 +271,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
obj = getRequestValue(value);
}
out = TripleUtil.pack(obj);
-
return super.compress(out);
}
@@ -133,7 +289,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
RpcInvocation invocation = (RpcInvocation) value;
return invocation.getArguments()[0];
}
-
return value;
}
@@ -162,11 +317,16 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
protected Metadata createRequestMeta(RpcInvocation inv) {
Metadata metadata = new DefaultMetadata();
- metadata.put(TripleHeaderEnum.PATH_KEY.getHeader(), "/" + inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName())
- .put(TripleHeaderEnum.AUTHORITY_KEY.getHeader(), getUrl().getAddress())
- .put(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO)
+ // put http2 params
+ metadata.put(Http2Headers.PseudoHeaderName.SCHEME.value(), this.getScheme())
+ .put(Http2Headers.PseudoHeaderName.PATH.value(), getMethodPath(inv))
+ .put(Http2Headers.PseudoHeaderName.AUTHORITY.value(), getUrl().getAddress())
+ .put(Http2Headers.PseudoHeaderName.METHOD.value(), HttpMethod.POST.asciiName());
+
+ metadata.put(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO)
.put(TripleHeaderEnum.TIMEOUT.getHeader(), inv.get(CommonConstants.TIMEOUT_KEY) + "m")
- .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
+ .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS)
+ ;
metadata.putIfNotNull(TripleHeaderEnum.SERVICE_VERSION.getHeader(), getUrl().getVersion())
.putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
@@ -183,15 +343,33 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
return metadata;
}
- @Override
- protected void cancelByRemoteReset(Http2Error http2Error) {
- DefaultFuture2.getFuture(getRequest().getId()).cancel();
+ private String getMethodPath(RpcInvocation inv) {
+ return "/" + inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName();
}
- @Override
- protected void cancelByLocal(Throwable throwable) {
- getCancellationContext().cancel(throwable);
+ private static Compressor getCompressor(URL url, ServiceModel model) {
+ String compressorStr = url.getParameter(COMPRESSOR_KEY);
+ if (compressorStr == null) {
+ // Compressor can not be set by dynamic config
+ compressorStr = ConfigurationUtils
+ .getCachedDynamicProperty(model.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+ }
+ return Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
}
-
+ /**
+ * Get the tri protocol special MethodDescriptor
+ */
+ private static MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
+ List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
+ if (CollectionUtils.isEmpty(methodDescriptors)) {
+ throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ }
+ for (MethodDescriptor methodDescriptor : methodDescriptors) {
+ if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
+ return methodDescriptor;
+ }
+ }
+ throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 567f45a..f566641 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -31,7 +31,10 @@ import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.triple.TripleWrapper;
import com.google.protobuf.Message;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
import java.util.Arrays;
import java.util.List;
@@ -45,9 +48,10 @@ import static org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KE
public abstract class AbstractServerStream extends AbstractStream implements Stream {
private final ProviderModel providerModel;
+ private final List<HeaderFilter> headerFilters;
+ private ServiceDescriptor serviceDescriptor;
private List<MethodDescriptor> methodDescriptors;
private Invoker<?> invoker;
- private final List<HeaderFilter> headerFilters;
protected AbstractServerStream(URL url) {
this(url, lookupProviderModel(url));
@@ -102,6 +106,14 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
return this;
}
+ public ServiceDescriptor getServiceDescriptor() {
+ return serviceDescriptor;
+ }
+
+ public void setServiceDescriptor(ServiceDescriptor serviceDescriptor) {
+ this.serviceDescriptor = serviceDescriptor;
+ }
+
public Invoker<?> getInvoker() {
return invoker;
}
@@ -134,7 +146,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
if (getProviderModel() != null) {
- ClassLoadUtil.switchContextLoader(getProviderModel().getClassLoader());
+ ClassLoadUtil.switchContextLoader(getProviderModel().getServiceInterfaceClass().getClassLoader());
}
if (getMethodDescriptor() == null || getMethodDescriptor().isNeedWrap()) {
final TripleWrapper.TripleRequestWrapper wrapper = TripleUtil.unpack(data,
@@ -160,16 +172,18 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
.withDescription("Method :" + getMethodName() + "[" + Arrays.toString(paramTypes) + "] " +
"not found of service:" + getServiceDescriptor().getServiceName()));
-
return null;
}
}
-
return TripleUtil.unwrapReq(getUrl(), wrapper, getMultipleSerialization());
} else {
return new Object[]{TripleUtil.unpack(data, getMethodDescriptor().getParameterClasses()[0])};
}
-
+ } catch (Throwable throwable) {
+ LOGGER.warn("Decode request failed:", throwable);
+ transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription("Decode request failed:" + throwable.getMessage()));
+ return null;
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}
@@ -178,23 +192,39 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
/**
* create basic meta data
*/
- protected Metadata createRequestMeta() {
+ protected Metadata createResponseMeta() {
Metadata metadata = new DefaultMetadata();
+ metadata.put(Http2Headers.PseudoHeaderName.STATUS.value(), HttpResponseStatus.OK.codeAsText());
+ metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), super.getCompressor().getMessageEncoding())
- .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
+ .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), getAcceptEncoding());
return metadata;
}
protected byte[] encodeResponse(Object value) {
- final com.google.protobuf.Message message;
- if (getMethodDescriptor().isNeedWrap()) {
- message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
- getMultipleSerialization());
- } else {
- message = (Message) value;
+ final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ try {
+ if (getProviderModel() != null) {
+ ClassLoadUtil.switchContextLoader(getProviderModel().getServiceInterfaceClass().getClassLoader());
+ }
+ final Message message;
+ if (getMethodDescriptor().isNeedWrap()) {
+ message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
+ getMultipleSerialization());
+ } else {
+ message = (Message) value;
+ }
+ byte[] out = TripleUtil.pack(message);
+ return super.compress(out);
+ } catch (Throwable throwable) {
+ LOGGER.error("Encode Response data error ", throwable);
+ transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
+ .withCause(throwable)
+ .withDescription("Encode Response data error"));
+ return null;
+ } finally {
+ ClassLoadUtil.switchContextLoader(tccl);
}
- byte[] out = TripleUtil.pack(message);
- return super.compress(out);
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 1f4f19a..aaac6d8 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -25,11 +25,9 @@ import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.Constants;
-import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
import com.google.protobuf.Any;
@@ -52,13 +50,15 @@ public abstract class AbstractStream implements Stream {
private final TransportObserver transportObserver;
private final Executor executor;
private final CancellationContext cancellationContext;
- private ServiceDescriptor serviceDescriptor;
+ // AcceptEncoding does not change after the application is started,
+ // so it can be obtained when constructing the stream
+ private final String acceptEncoding;
+
private MethodDescriptor methodDescriptor;
private String methodName;
- private Request request;
private String serializeType;
private StreamObserver<Object> streamSubscriber;
- private TransportObserver transportSubscriber;
+ private AbstractChannelTransportObserver transportSubscriber;
private Compressor compressor = IdentityCompressor.NONE;
private Compressor deCompressor = IdentityCompressor.NONE;
private volatile boolean cancelled = false;
@@ -77,14 +77,7 @@ public abstract class AbstractStream implements Stream {
this.cancellationContext = new CancellationContext();
this.transportObserver = createTransportObserver();
this.streamObserver = createStreamObserver();
- }
-
- public boolean isCancelled() {
- return cancelled;
- }
-
- protected CancellationContext getCancellationContext() {
- return cancellationContext;
+ this.acceptEncoding = Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel());
}
private Executor lookupExecutor(URL url, Executor executor) {
@@ -106,13 +99,20 @@ public abstract class AbstractStream implements Stream {
return new SerializingExecutor(executor);
}
- public Request getRequest() {
- return request;
+ public String getAcceptEncoding() {
+ return acceptEncoding;
}
- public AbstractStream request(Request request) {
- this.request = request;
- return this;
+ public TransportState getState() {
+ return transportSubscriber.state;
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ protected CancellationContext getCancellationContext() {
+ return cancellationContext;
}
@Override
@@ -131,6 +131,7 @@ public abstract class AbstractStream implements Stream {
public AbstractStream method(MethodDescriptor md) {
this.methodDescriptor = md;
+ this.methodName = md.getMethodName();
return this;
}
@@ -163,8 +164,14 @@ public abstract class AbstractStream implements Stream {
protected abstract void cancelByLocal(Throwable throwable);
+ /**
+ * create request StreamObserver
+ */
protected abstract StreamObserver<Object> createStreamObserver();
+ /**
+ * create response TransportObserver
+ */
protected abstract TransportObserver createTransportObserver();
public String getSerializeType() {
@@ -172,8 +179,8 @@ public abstract class AbstractStream implements Stream {
}
public AbstractStream serialize(String serializeType) {
- if ("hessian4".equals(serializeType)) {
- serializeType = "hessian2";
+ if (TripleConstant.HESSIAN4.equals(serializeType)) {
+ serializeType = TripleConstant.HESSIAN2;
}
this.serializeType = serializeType;
return this;
@@ -195,14 +202,6 @@ public abstract class AbstractStream implements Stream {
return methodDescriptor;
}
- public ServiceDescriptor getServiceDescriptor() {
- return serviceDescriptor;
- }
-
- public void setServiceDescriptor(ServiceDescriptor serviceDescriptor) {
- this.serviceDescriptor = serviceDescriptor;
- }
-
public Compressor getCompressor() {
return this.compressor;
}
@@ -256,7 +255,7 @@ public abstract class AbstractStream implements Stream {
}
@Override
- public void subscribe(TransportObserver observer) {
+ public void subscribe(AbstractChannelTransportObserver observer) {
this.transportSubscriber = observer;
}
@@ -284,7 +283,7 @@ public abstract class AbstractStream implements Stream {
}
getTransportSubscriber().onMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
- LOGGER.error("[Triple-Server-Error] status=" + status.code.code + " service=" + getServiceDescriptor().getServiceName()
+ LOGGER.error("[Triple-Error] status=" + status.code.code
+ " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
}
}
@@ -349,7 +348,7 @@ public abstract class AbstractStream implements Stream {
if (TripleHeaderEnum.containsExcludeAttachments(key)) {
continue;
}
- if (key.endsWith("-bin") && key.length() > 4) {
+ if (key.endsWith(TripleConstant.GRPC_BIN_SUFFIX) && key.length() > 4) {
try {
attachments.put(key.substring(0, key.length() - 4), TripleUtil.decodeASCIIByte(header.getValue()));
} catch (Exception e) {
@@ -363,6 +362,9 @@ public abstract class AbstractStream implements Stream {
}
protected void convertAttachment(Metadata metadata, Map<String, Object> attachments) {
+ if (attachments == null) {
+ return;
+ }
for (Map.Entry<String, Object> entry : attachments.entrySet()) {
final String key = entry.getKey().toLowerCase(Locale.ROOT);
if (Http2Headers.PseudoHeaderName.isPseudoHeader(key)) {
@@ -376,6 +378,13 @@ public abstract class AbstractStream implements Stream {
}
}
+ /**
+ * Convert each user's attach value to metadata
+ *
+ * @param metadata {@link Metadata}
+ * @param key metadata key
+ * @param v metadata value (Metadata Only string and byte arrays are allowed)
+ */
private void convertSingleAttachment(Metadata metadata, String key, Object v) {
try {
if (v instanceof String) {
@@ -383,7 +392,7 @@ public abstract class AbstractStream implements Stream {
metadata.put(key, str);
} else if (v instanceof byte[]) {
String str = TripleUtil.encodeBase64ASCII((byte[]) v);
- metadata.put(key + "-bin", str);
+ metadata.put(key + TripleConstant.GRPC_BIN_SUFFIX, str);
}
} catch (Throwable t) {
LOGGER.warn("Meet exception when convert single attachment key:" + key + " value=" + v, t);
@@ -412,7 +421,10 @@ public abstract class AbstractStream implements Stream {
@Override
public void onReset(Http2Error http2Error) {
- getTransportSubscriber().onReset(http2Error);
+ if (getState().allowSendReset()) {
+ getState().setResetSend();
+ getTransportSubscriber().onReset(http2Error);
+ }
}
@Override
@@ -454,14 +466,19 @@ public abstract class AbstractStream implements Stream {
@Override
public void onComplete() {
- final GrpcStatus status = extractStatusFromMeta(getHeaders());
- if (Code.isOk(status.code.code)) {
- doOnComplete();
- } else {
- onError(status);
- }
+ execute(() -> {
+ final GrpcStatus status = extractStatusFromMeta(getHeaders());
+ if (Code.isOk(status.code.code)) {
+ doOnComplete();
+ } else {
+ onError(status);
+ }
+ });
}
+ /**
+ * This method exception needs to be caught by the implementation class
+ */
protected abstract void doOnComplete();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 42e7089..b868d87 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -19,8 +19,10 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
+import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcInvocation;
public class ClientStream extends AbstractClientStream implements Stream {
@@ -29,75 +31,81 @@ public class ClientStream extends AbstractClientStream implements Stream {
}
@Override
- protected StreamObserver<Object> createStreamObserver() {
- return new ClientStreamObserverImpl(getCancellationContext());
+ protected TransportObserver createTransportObserver() {
+ return new ClientTransportObserverImpl();
}
@Override
- protected TransportObserver createTransportObserver() {
- return new AbstractTransportObserver() {
-
- @Override
- public void onData(byte[] data, boolean endStream) {
- execute(() -> {
- final Object resp = deserializeResponse(data);
- getStreamSubscriber().onNext(resp);
- });
- }
-
- @Override
- public void onComplete() {
- execute(() -> {
- final GrpcStatus status = extractStatusFromMeta(getHeaders());
-
- if (GrpcStatus.Code.isOk(status.code.code)) {
- getStreamSubscriber().onCompleted();
- } else {
- getStreamSubscriber().onError(status.asException());
- }
- });
- }
- };
+ protected void doOnStartCall() {
+ Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+ AppResponse result = getMethodDescriptor().isServerStream() ? callServerStream() : callBiStream();
+ response.setResult(result);
+ DefaultFuture2.received(getConnection(), response);
}
- private class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+ private AppResponse callServerStream() {
+ StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[1];
+ obServer = attachCancelContext(obServer, getCancellationContext());
+ subscribe(obServer);
+ asStreamObserver().onNext(getRpcInvocation().getArguments()[0]);
+ asStreamObserver().onCompleted();
+ return new AppResponse();
+ }
- private boolean metaSent;
+ private AppResponse callBiStream() {
+ StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[0];
+ obServer = attachCancelContext(obServer, getCancellationContext());
+ subscribe(obServer);
+ return new AppResponse(asStreamObserver());
+ }
- public ClientStreamObserverImpl(CancellationContext cancellationContext) {
- super(cancellationContext);
- this.metaSent = false;
+ private <T> StreamObserver<T> attachCancelContext(StreamObserver<T> observer, CancellationContext context) {
+ if (observer instanceof CancelableStreamObserver) {
+ CancelableStreamObserver<T> streamObserver = (CancelableStreamObserver<T>) observer;
+ streamObserver.setCancellationContext(context);
+ return streamObserver;
}
+ return observer;
+ }
- @Override
- public void onNext(Object data) {
- if (!metaSent) {
- metaSent = true;
- final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
- getTransportSubscriber().onMetadata(metadata, false);
- }
- final byte[] bytes = encodeRequest(data);
- getTransportSubscriber().onData(bytes, false);
- }
+ private class ClientTransportObserverImpl extends AbstractTransportObserver {
+
+ private boolean error = false;
@Override
- public void onError(Throwable throwable) {
- transportError(throwable);
+ public void onData(byte[] data, boolean endStream) {
+ execute(() -> {
+ try {
+ final Object resp = deserializeResponse(data);
+ getStreamSubscriber().onNext(resp);
+ } catch (Throwable throwable) {
+ onError(throwable);
+ }
+ });
}
@Override
- public void onCompleted() {
- getTransportSubscriber().onComplete();
+ public void onComplete() {
+ execute(() -> {
+ getState().setServerEndStreamReceived();
+ final GrpcStatus status = extractStatusFromMeta(getHeaders());
+ if (GrpcStatus.Code.isOk(status.code.code)) {
+ getStreamSubscriber().onCompleted();
+ } else {
+ onError(status.cause);
+ }
+ });
}
- @Override
- public void setCompression(String compression) {
- if (metaSent) {
- cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+ private void onError(Throwable throwable) {
+ if (error) {
return;
}
- Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
- setCompressor(compressor);
+ error = true;
+ if (!getState().serverSendStreamReceived()) {
+ cancel(throwable);
+ }
+ getStreamSubscriber().onError(throwable);
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 3715e98..1a53881 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -17,10 +17,11 @@
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
@@ -28,86 +29,47 @@ import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
-import io.netty.util.AsciiString;
-public class ClientTransportObserver implements TransportObserver {
- private final AsciiString SCHEME;
- private final ChannelHandlerContext ctx;
- private final Http2StreamChannel streamChannel;
- private final ChannelPromise promise;
- private boolean headerSent = false;
- private boolean endStreamSent = false;
- private boolean resetSent = false;
+public class ClientTransportObserver extends AbstractChannelTransportObserver {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientTransportObserver.class);
+ private final ChannelPromise promise;
+ private final Http2StreamChannel streamChannel;
- public ClientTransportObserver(ChannelHandlerContext ctx, AbstractClientStream stream, ChannelPromise promise) {
- this.ctx = ctx;
+ public ClientTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
+ this.streamChannel = channel;
this.promise = promise;
- Boolean ssl = ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).get();
- if (ssl != null && ssl) {
- SCHEME = TripleConstant.HTTPS_SCHEME;
- } else {
- SCHEME = TripleConstant.HTTP_SCHEME;
- }
-
- final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
- streamChannel = streamChannelBootstrap.open().syncUninterruptibly().getNow();
-
- final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
- streamChannel.pipeline().addLast(responseHandler)
- .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
- .addLast(new TripleClientInboundHandler());
- streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
}
@Override
- public void onMetadata(Metadata metadata, boolean endStream) {
- if (headerSent) {
- return;
- }
- if (resetSent) {
- return;
- }
- final Http2Headers headers = new DefaultHttp2Headers(true)
- .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
- .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
- .scheme(SCHEME)
- .method(HttpMethod.POST.asciiName());
+ protected void doOnMetadata(Metadata metadata, boolean endStream) {
+ final Http2Headers headers = new DefaultHttp2Headers(true);
metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
- headerSent = true;
streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
.addListener(future -> {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
}
});
-
}
@Override
- public void onReset(Http2Error http2Error) {
- resetSent = true;
- streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+ protected void doOnData(byte[] data, boolean endStream) {
+ ByteBuf buf = streamChannel.alloc().buffer();
+ buf.writeByte(getCompressFlag());
+ buf.writeInt(data.length);
+ buf.writeBytes(data);
+ streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
.addListener(future -> {
- if (future.isSuccess()) {
- promise.trySuccess();
- } else {
+ if (!future.isSuccess()) {
promise.tryFailure(future.cause());
}
});
}
@Override
- public void onData(byte[] data, boolean endStream) {
- if (resetSent) {
- return;
- }
- ByteBuf buf = ctx.alloc().buffer();
- buf.writeByte(getCompressFlag());
- buf.writeInt(data.length);
- buf.writeBytes(data);
- streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
+ protected void doOnReset(Http2Error http2Error) {
+ streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
.addListener(future -> {
if (future.isSuccess()) {
promise.trySuccess();
@@ -118,14 +80,7 @@ public class ClientTransportObserver implements TransportObserver {
}
@Override
- public void onComplete() {
- if (resetSent) {
- return;
- }
- if (endStreamSent) {
- return;
- }
- endStreamSent = true;
+ protected void doOnComplete() {
streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
.addListener(future -> {
if (future.isSuccess()) {
@@ -138,7 +93,6 @@ public class ClientTransportObserver implements TransportObserver {
private int getCompressFlag() {
AbstractClientStream stream = streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).get();
- return TransportObserver.calcCompressFlag(stream.getCompressor());
+ return calcCompressFlag(stream.getCompressor());
}
-
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
index c6a2791..09c6e6b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -34,6 +34,8 @@ import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
@SPI(value = DEFAULT_COMPRESSOR, scope = ExtensionScope.FRAMEWORK)
public interface Compressor {
+ Compressor NONE = new IdentityCompressor();
+
String DEFAULT_COMPRESSOR = "identity";
static Compressor getCompressor(FrameworkModel frameworkModel, String compressorStr) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index c21ec32..56a3a21 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.rpc.RpcException;
@@ -38,8 +39,8 @@ import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code.UNAVAILABLE;
public class GrpcStatus {
public final Code code;
- public final Throwable cause;
- public final String description;
+ public Throwable cause;
+ public String description;
public GrpcStatus(Code code, Throwable cause, String description) {
this.code = code;
@@ -55,6 +56,10 @@ public class GrpcStatus {
return new GrpcStatus(code, null, null);
}
+ public static GrpcStatus fromCodeWithDescription(Code code, String description) {
+ return new GrpcStatus(code, null, description);
+ }
+
public static byte toDubboStatus(Code code) {
byte status;
switch (code) {
@@ -90,7 +95,26 @@ public class GrpcStatus {
return status;
}
- public static GrpcStatus rpcExceptionCodeToGrpc(int rpcExceptionCode) {
+ /**
+ * todo The remaining exceptions are converted to status
+ */
+ public static GrpcStatus getStatus(Throwable throwable) {
+ return getStatus(throwable, null);
+ }
+
+ public static GrpcStatus getStatus(Throwable throwable, String description) {
+ if (throwable instanceof RpcException) {
+ RpcException rpcException = (RpcException) throwable;
+ Code code = rpcExceptionCodeToGrpcCode(rpcException.getCode());
+ return new GrpcStatus(code, throwable, description);
+ }
+ if (throwable instanceof TimeoutException) {
+ return new GrpcStatus(GrpcStatus.Code.DEADLINE_EXCEEDED, throwable, description);
+ }
+ return new GrpcStatus(Code.UNKNOWN, throwable, description);
+ }
+
+ public static Code rpcExceptionCodeToGrpcCode(int rpcExceptionCode) {
Code code;
switch (rpcExceptionCode) {
case TIMEOUT_EXCEPTION:
@@ -114,7 +138,7 @@ public class GrpcStatus {
code = Code.UNKNOWN;
break;
}
- return fromCode(code);
+ return code;
}
public static String limitSizeTo4KB(String desc) {
@@ -133,11 +157,13 @@ public class GrpcStatus {
}
public GrpcStatus withCause(Throwable cause) {
- return new GrpcStatus(this.code, cause, this.description);
+ this.cause = cause;
+ return this;
}
public GrpcStatus withDescription(String description) {
- return new GrpcStatus(this.code, this.cause, description);
+ this.description = description;
+ return this;
}
public RpcException asException() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
index a8ac2fc..08d27b7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
@@ -22,7 +22,6 @@ package org.apache.dubbo.rpc.protocol.tri;
*/
public class IdentityCompressor implements Compressor {
- public static final Compressor NONE = new IdentityCompressor();
@Override
public String getMessageEncoding() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 7958b61..86a811d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -40,20 +40,26 @@ public class ServerStream extends AbstractServerStream implements Stream {
}
private class ServerStreamObserverImpl implements ServerStreamObserver<Object> {
- private boolean headersSent;
@Override
public void onNext(Object data) {
- if (!headersSent) {
- getTransportSubscriber().onMetadata(createRequestMeta(), false);
- headersSent = true;
+ if (getState().allowSendMeta()) {
+ getTransportSubscriber().onMetadata(createResponseMeta(), false);
}
final byte[] bytes = encodeResponse(data);
- getTransportSubscriber().onData(bytes, false);
+ if (bytes == null) {
+ return;
+ }
+ if (getState().allowSendData()) {
+ getTransportSubscriber().onData(bytes, false);
+ }
}
@Override
public void onError(Throwable throwable) {
+ if (!getState().allowSendEndStream()) {
+ return;
+ }
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withCause(throwable)
.withDescription("Biz exception");
@@ -62,15 +68,15 @@ public class ServerStream extends AbstractServerStream implements Stream {
@Override
public void onCompleted() {
- Metadata metadata = new DefaultMetadata();
- metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), "OK");
- metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
- getTransportSubscriber().onMetadata(metadata, true);
+ if (!getState().allowSendEndStream()) {
+ return;
+ }
+ getTransportSubscriber().onMetadata(TripleConstant.SUCCESS_RESPONSE_META, true);
}
@Override
public void setCompression(String compression) {
- if (headersSent) {
+ if (!getState().allowSendMeta()) {
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withDescription("Metadata already has been sent,can not set compression");
transportError(status);
@@ -110,6 +116,10 @@ public class ServerStream extends AbstractServerStream implements Stream {
final RpcInvocation inv = buildInvocation(metadata);
inv.setArguments(new Object[]{asStreamObserver()});
final Result result = getInvoker().invoke(inv);
+ if (result.hasException()) {
+ transportError(GrpcStatus.getStatus(result.getException()));
+ return;
+ }
try {
subscribe((StreamObserver<Object>) result.getValue());
} catch (Throwable t) {
@@ -168,7 +178,10 @@ public class ServerStream extends AbstractServerStream implements Stream {
final Object[] arguments = deserializeRequest(in);
if (arguments != null) {
inv.setArguments(new Object[]{arguments[0], asStreamObserver()});
- getInvoker().invoke(inv);
+ final Result result = getInvoker().invoke(inv);
+ if (result.hasException()) {
+ transportError(GrpcStatus.getStatus(result.getException()));
+ }
}
} finally {
RpcContext.removeCancellationContext();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index b25fef9..0e09ad4 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -21,42 +21,31 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-
-public class ServerTransportObserver implements TransportObserver {
+public class ServerTransportObserver extends AbstractChannelTransportObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerTransportObserver.class);
private final ChannelHandlerContext ctx;
- private boolean headerSent = false;
- private boolean resetSent = false;
public ServerTransportObserver(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
- @Override
- public void onMetadata(Metadata metadata, boolean endStream) {
- if (resetSent) {
- return;
- }
- final DefaultHttp2Headers headers = new DefaultHttp2Headers(true);
- metadata.forEach(e -> {
- headers.set(e.getKey(), e.getValue());
- });
- if (!headerSent) {
- headerSent = true;
- headers.status(OK.codeAsText());
- headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
+ public void onMetadata(Http2Headers headers, boolean endStream) {
+ if (endStream) {
+ state.setEndStreamSend();
+ } else {
+ state.setMetaSend();
}
- // If endStream is true, the channel will be closed, so you cannot listen for errors and continue sending any frame
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
.addListener(future -> {
if (!future.isSuccess()) {
@@ -66,8 +55,33 @@ public class ServerTransportObserver implements TransportObserver {
}
@Override
- public void onReset(Http2Error http2Error) {
- resetSent = true;
+ public void onMetadata(Metadata metadata, boolean endStream) {
+ doOnMetadata(metadata, endStream);
+ }
+
+ @Override
+ public void onData(byte[] data, boolean endStream) {
+ doOnData(data, endStream);
+ }
+
+ @Override
+ protected void doOnMetadata(Metadata metadata, boolean endStream) {
+ final DefaultHttp2Headers headers = new DefaultHttp2Headers(true);
+ metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
+ onMetadata(headers, endStream);
+ }
+
+ @Override
+ protected void doOnData(byte[] data, boolean endStream) {
+ ByteBuf buf = ctx.alloc().buffer();
+ buf.writeByte(getCompressFlag());
+ buf.writeInt(data.length);
+ buf.writeBytes(data);
+ onData(buf, endStream);
+ }
+
+ @Override
+ protected void doOnReset(Http2Error http2Error) {
ctx.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
.addListener(future -> {
if (!future.isSuccess()) {
@@ -77,15 +91,20 @@ public class ServerTransportObserver implements TransportObserver {
}
@Override
- public void onData(byte[] data, boolean endStream) {
- if (resetSent) {
- return;
+ protected void doOnComplete() {
+
+ }
+
+ public void onData(String str, boolean endStream) {
+ ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), str);
+ onData(buf, endStream);
+ }
+
+ public void onData(ByteBuf buf, boolean endStream) {
+ if (endStream) {
+ state.setEndStreamSend();
}
- ByteBuf buf = ctx.alloc().buffer();
- buf.writeByte(getCompressFlag());
- buf.writeInt(data.length);
- buf.writeBytes(data);
- ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
+ ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
.addListener(future -> {
if (!future.isSuccess()) {
LOGGER.warn("send data error endStream=" + endStream, future.cause());
@@ -93,9 +112,8 @@ public class ServerTransportObserver implements TransportObserver {
});
}
-
private int getCompressFlag() {
AbstractServerStream stream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
- return TransportObserver.calcCompressFlag(stream.getCompressor());
+ return calcCompressFlag(stream.getCompressor());
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
index de83eab..358cc33 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
@@ -23,7 +23,7 @@ import org.apache.dubbo.common.stream.StreamObserver;
/**
* Stream acts as a bi-directional intermediate layer for streaming data processing. It serializes object instance to
* byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #asTransportObserver()}
- * and {@link #subscribe(TransportObserver)} provide {@link TransportObserver} to send or receive remote data.
+ * and {@link #subscribe(AbstractChannelTransportObserver)} provide {@link TransportObserver} to send or receive remote data.
* {@link #asStreamObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
* as API for users fetching/emitting objects from/to remote peer.
*/
@@ -36,7 +36,7 @@ public interface Stream {
*
* @param observer receives remote byte[] data
*/
- void subscribe(TransportObserver observer);
+ void subscribe(AbstractChannelTransportObserver observer);
/**
* Get a downstream data observer for writing byte[] data to this stream
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 015a87e..7e5bc8f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -21,21 +21,12 @@ import io.netty.handler.codec.http2.Http2Error;
public interface TransportObserver {
- static int calcCompressFlag(Compressor compressor) {
- if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
- return 0;
- }
- return 1;
- }
-
void onMetadata(Metadata metadata, boolean endStream);
void onData(byte[] data, boolean endStream);
- default void onReset(Http2Error http2Error) {
- }
+ void onReset(Http2Error http2Error);
- default void onComplete() {
- }
+ void onComplete();
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
new file mode 100644
index 0000000..4160fd6
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+public class TransportState {
+
+ private volatile int state = 0;
+ private static final int META_SEND = 0b00000000000000000000000000000001;
+ private static final int RESET_SEND = 0b00000000000000000000000000000010;
+ private static final int END_STREAM_SEND = 0b00000000000000000000000000000100;
+ private static final int SERVER_SEND_STREAM_RECEIVED = 0b00000000000000000000000000001000;
+
+ private static final int ALLOW_META_SEND = 0b00000000000000000000000000000000;
+ private static final int ALLOW_DATA_SEND = META_SEND;
+ private static final int ALLOW_END_STREAM_SEND = META_SEND;
+ private static final int ALLOW_RESET_SEND = 0b00000000000000000000000000000001;
+
+ public TransportState() {
+ }
+
+ public void setState(int state) {
+ this.state = state;
+ }
+
+ public void setMetaSend() {
+ this.state = this.state | META_SEND;
+ }
+
+ public void setResetSend() {
+ this.state = this.state | RESET_SEND;
+ }
+
+ public void setEndStreamSend() {
+ this.state = this.state | END_STREAM_SEND;
+ }
+
+ public void setServerEndStreamReceived() {
+ this.state = this.state | SERVER_SEND_STREAM_RECEIVED;
+ }
+
+ public boolean serverSendStreamReceived() {
+ return (this.state & SERVER_SEND_STREAM_RECEIVED) > 0;
+ }
+
+ public boolean allowSendMeta() {
+ return this.state == ALLOW_META_SEND;
+ }
+
+ public boolean allowSendReset() {
+ return (this.state & RESET_SEND) != RESET_SEND;
+ }
+
+ public boolean allowSendData() {
+ return this.state == ALLOW_DATA_SEND;
+ }
+
+ public boolean allowSendEndStream() {
+ return this.state == ALLOW_END_STREAM_SEND;
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 83928d2..f215c27 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -16,33 +16,16 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
-import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
-import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
public class TripleClientRequestHandler extends ChannelDuplexHandler {
@@ -63,85 +46,19 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
- final RpcInvocation inv = (RpcInvocation) req.getData();
- final URL url = inv.getInvoker().getUrl();
- ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
-
- MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel, inv);
-
- ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
- final AbstractClientStream stream = AbstractClientStream.newClientStream(url, methodDescriptor.isUnary());
-
- String ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY);
- if (StringUtils.isNotEmpty(ssl)) {
- ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
- }
- // Compressor can not be set by dynamic config
- String compressorStr = ConfigurationUtils
- .getCachedDynamicProperty(inv.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
-
- Compressor compressor = Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
- if (compressor != null) {
- stream.setCompressor(compressor);
- }
-
- stream.service(consumerModel)
- .connection(Connection.getConnectionFromChannel(ctx.channel()))
- .method(methodDescriptor)
- .methodName(methodDescriptor.getMethodName())
- .request(req)
- .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
- .subscribe(new ClientTransportObserver(ctx, stream, promise));
-
- if (methodDescriptor.isUnary()) {
- stream.asStreamObserver().onNext(inv);
- stream.asStreamObserver().onCompleted();
- } else {
- Response response = new Response(req.getId(), req.getVersion());
- AppResponse result;
- // the stream method params is fixed
- if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
- || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
- StreamObserver<Object> obServer = (StreamObserver<Object>) inv.getArguments()[0];
- obServer = attachCancelContext(obServer, stream.getCancellationContext());
- stream.subscribe(obServer);
- result = new AppResponse(stream.asStreamObserver());
- } else {
- StreamObserver<Object> obServer = (StreamObserver<Object>) inv.getArguments()[1];
- obServer = attachCancelContext(obServer, stream.getCancellationContext());
- stream.subscribe(obServer);
- result = new AppResponse();
- stream.asStreamObserver().onNext(inv.getArguments()[0]);
- stream.asStreamObserver().onCompleted();
- }
- response.setResult(result);
- DefaultFuture2.received(stream.getConnection(), response);
- }
- }
-
- /**
- * Get the tri protocol special MethodDescriptor
- */
- private MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
- List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
- if (CollectionUtils.isEmpty(methodDescriptors)) {
- throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
- }
- for (MethodDescriptor methodDescriptor : methodDescriptors) {
- if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
- return methodDescriptor;
- }
- }
- throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
- }
-
-
- public <T> StreamObserver<T> attachCancelContext(StreamObserver<T> observer, CancellationContext context) {
- if (observer instanceof CancelableStreamObserver) {
- CancelableStreamObserver<T> streamObserver = ((CancelableStreamObserver<T>) observer);
- streamObserver.setCancellationContext(context);
- return streamObserver;
- }
- return observer;
+ Connection connection = Connection.getConnectionFromChannel(ctx.channel());
+ final AbstractClientStream stream = AbstractClientStream.newClientStream(req, connection);
+ final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
+ streamChannelBootstrap.open()
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ final Http2StreamChannel curChannel = (Http2StreamChannel) future.get();
+ // Start call only when the channel creation is successful
+ stream.startCall(curChannel, promise);
+ } else {
+ promise.tryFailure(future.cause());
+ DefaultFuture2.getFuture(req.getId()).cancel();
+ }
+ });
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 88c534f..91ace1c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -21,24 +21,40 @@ import org.apache.dubbo.common.constants.CommonConstants;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
-public interface TripleConstant {
- String CONTENT_PROTO = "application/grpc+proto";
- String APPLICATION_GRPC = "application/grpc";
- String TRI_VERSION = "1.0.0";
+public class TripleConstant {
- String SERIALIZATION_KEY = "serialization";
- String TE_KEY = "te";
+ public static final String CONTENT_PROTO = "application/grpc+proto";
+ public static final String APPLICATION_GRPC = "application/grpc";
+ public static final String TEXT_PLAIN_UTF8 = "text/plain; encoding=utf-8";
+ public static final String TRI_VERSION = "1.0.0";
- AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
+ public static final String SERIALIZATION_KEY = "serialization";
+ public static final String TE_KEY = "te";
- AsciiString HTTPS_SCHEME = AsciiString.of("https");
- AsciiString HTTP_SCHEME = AsciiString.of("http");
+ public static final String HESSIAN4 = "hessian4";
+ public static final String HESSIAN2 = "hessian2";
- AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.newInstance(
- "tri_server_stream");
- AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
- "tri_client_stream");
+ public static final String GRPC_BIN_SUFFIX = "-bin";
+
+ public static final AsciiString HTTPS_SCHEME = AsciiString.of("https");
+ public static final AsciiString HTTP_SCHEME = AsciiString.of("http");
+
+ public static final AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
+ public static final AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.valueOf("tri_server_stream");
+ public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.valueOf("tri_client_stream");
+
+ public static final String SUCCESS_RESPONSE_MESSAGE = "OK";
+ public static final String SUCCESS_RESPONSE_STATUS = Integer.toString(GrpcStatus.Code.OK.code);
+
+ public static final Metadata SUCCESS_RESPONSE_META = getSuccessResponseMeta();
+
+ private static Metadata getSuccessResponseMeta() {
+ Metadata metadata = new DefaultMetadata();
+ metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_MESSAGE);
+ metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_STATUS);
+ return metadata;
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index c3e783a..46207bb 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
import org.apache.dubbo.rpc.model.MethodDescriptor;
@@ -37,6 +36,7 @@ import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Frame;
@@ -47,6 +47,7 @@ import io.netty.util.ReferenceCountUtil;
import java.util.List;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
@@ -94,12 +95,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Exception in processing triple message", cause);
}
- if (cause instanceof RpcException) {
- TripleUtil.responseErr(ctx, GrpcStatus.rpcExceptionCodeToGrpc(((RpcException) cause).getCode()));
- } else {
- TripleUtil.responseErr(ctx, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Provider's error:\n" + cause.getMessage()));
- }
+ GrpcStatus status = GrpcStatus.getStatus(cause, "Provider's error:\n" + cause.getMessage());
+ final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+ serverStream.transportError(status, null, true);
}
public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception {
@@ -128,23 +126,24 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
final Http2Headers headers = msg.headers();
+ ServerTransportObserver transportObserver = new ServerTransportObserver(ctx);
if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
- TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
+ responsePlainTextError(transportObserver, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withDescription(String.format("Method '%s' is not supported", headers.method())));
return;
}
if (headers.path() == null) {
- TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+ responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
return;
}
final String path = headers.path().toString();
if (path.charAt(0) != '/') {
- TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+ responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
.withDescription(String.format("Expected path to start with /: %s", path)));
return;
@@ -152,15 +151,15 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
if (contentType == null) {
- TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+ responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
.withDescription("Content-Type is missing from the request"));
return;
}
final String contentString = contentType.toString();
- if (!TripleUtil.supportContentType(contentString)) {
- TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+ if (!supportContentType(contentString)) {
+ responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
GrpcStatus.fromCode(Code.INTERNAL.code)
.withDescription(String.format("Content-Type '%s' is not supported", contentString)));
return;
@@ -168,7 +167,8 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
String[] parts = path.split("/");
if (parts.length != 3) {
- TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Bad path format:" + path));
+ responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+ .withDescription("Bad path format:" + path));
return;
}
String serviceName = parts[1];
@@ -177,15 +177,15 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final Invoker<?> invoker = getInvoker(headers, serviceName);
if (invoker == null) {
- TripleUtil.responseErr(ctx,
- GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+ responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+ .withDescription("Service not found:" + serviceName));
return;
}
FrameworkServiceRepository repo = frameworkModel.getServiceRepository();
ProviderModel providerModel = repo.lookupExportedService(invoker.getUrl().getServiceKey());
if (providerModel == null || providerModel.getServiceModel() == null) {
- TripleUtil.responseErr(ctx,
- GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+ responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+ .withDescription("Service not found:" + serviceName));
return;
}
@@ -201,7 +201,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
} else {
methodDescriptors = providerModel.getServiceModel().getMethods(methodName);
if (CollectionUtils.isEmpty(methodDescriptors)) {
- TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+ responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
.withDescription("Method :" + methodName + " not found of service:" + serviceName));
return;
}
@@ -210,6 +210,22 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
methodDescriptor = methodDescriptors.get(0);
}
}
+
+ Compressor deCompressor = Compressor.NONE;
+ CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
+ if (null != messageEncoding) {
+ String compressorStr = messageEncoding.toString();
+ if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
+ Compressor compressor = Compressor.getCompressor(frameworkModel, compressorStr);
+ if (null == compressor) {
+ responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+ .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
+ return;
+ }
+ deCompressor = compressor;
+ }
+ }
+
boolean isUnary = methodDescriptor != null && methodDescriptor.isUnary();
final AbstractServerStream stream = AbstractServerStream.newServerStream(invoker.getUrl(), isUnary);
@@ -218,27 +234,15 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
stream.service(providerModel.getServiceModel())
.invoker(invoker)
.methodName(methodName)
- .subscribe(new ServerTransportObserver(ctx));
+ .setDeCompressor(deCompressor)
+ .subscribe(transportObserver);
if (methodDescriptor != null) {
stream.method(methodDescriptor);
} else {
// Then you need to find the corresponding parameter according to the request body
stream.methods(methodDescriptors);
}
- CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
- if (null != messageEncoding) {
- String compressorStr = messageEncoding.toString();
- if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
- Compressor compressor = Compressor.getCompressor(frameworkModel, compressorStr);
- if (null == compressor) {
- TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
- GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
- .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
- } else {
- stream.setDeCompressor(compressor);
- }
- }
- }
+
final TransportObserver observer = stream.asTransportObserver();
observer.onMetadata(new Http2HeaderMeta(headers), false);
if (msg.isEndStream()) {
@@ -247,6 +251,34 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
channel.attr(TripleConstant.SERVER_STREAM_KEY).set(stream);
}
+ /**
+ * must starts from application/grpc
+ */
+ private boolean supportContentType(String contentType) {
+ if (contentType == null) {
+ return false;
+ }
+ return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
+ }
+
+ private void responsePlainTextError(ServerTransportObserver observer, int code, GrpcStatus status) {
+ Http2Headers headers = new DefaultHttp2Headers(true)
+ .status(String.valueOf(code))
+ .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
+ .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
+ .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.TEXT_PLAIN_UTF8);
+ observer.onMetadata(headers, false);
+ observer.onData(status.description, true);
+ }
+
+ private void responseErr(ServerTransportObserver observer, GrpcStatus status) {
+ Http2Headers trailers = new DefaultHttp2Headers()
+ .status(OK.codeAsText())
+ .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
+ .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
+ .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
+ observer.onMetadata(trailers, true);
+ }
private boolean isEcho(String methodName) {
return CommonConstants.$ECHO.equals(methodName);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index f1f268c..2fe6687 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -28,14 +28,6 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.DebugInfo;
import com.google.rpc.ErrorInfo;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
-import io.netty.handler.codec.http2.Http2Headers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -51,8 +43,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-
public class TripleUtil {
// Some exceptions are not very useful and add too much noise to the log
private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
@@ -73,36 +63,6 @@ public class TripleUtil {
return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
}
- /**
- * must starts from application/grpc
- */
- public static boolean supportContentType(String contentType) {
- if (contentType == null) {
- return false;
- }
- return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
- }
-
- public static void responseErr(ChannelHandlerContext ctx, GrpcStatus status) {
- Http2Headers trailers = new DefaultHttp2Headers()
- .status(OK.codeAsText())
- .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
- .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
- .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
- ctx.writeAndFlush(new DefaultHttp2HeadersFrame(trailers, true));
- }
-
- public static void responsePlainTextError(ChannelHandlerContext ctx, int code, GrpcStatus status) {
- Http2Headers headers = new DefaultHttp2Headers(true)
- .status("" + code)
- .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
- .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
- .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), "text/plain; encoding=utf-8");
- ctx.write(new DefaultHttp2HeadersFrame(headers));
- ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), status.description);
- ctx.write(new DefaultHttp2DataFrame(buf, true));
- }
-
public static Object unwrapResp(URL url, TripleWrapper.TripleResponseWrapper wrap,
MultipleSerialization serialization) {
String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
@@ -276,15 +236,15 @@ public class TripleUtil {
}
public static String convertHessianToWrapper(String serializeType) {
- if ("hessian2".equals(serializeType)) {
- return "hessian4";
+ if (TripleConstant.HESSIAN2.equals(serializeType)) {
+ return TripleConstant.HESSIAN4;
}
return serializeType;
}
public static String convertHessianFromWrapper(String serializeType) {
- if ("hessian4".equals(serializeType)) {
- return "hessian2";
+ if (TripleConstant.HESSIAN4.equals(serializeType)) {
+ return TripleConstant.HESSIAN2;
}
return serializeType;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index e90f29e..dc98859 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -18,12 +18,10 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
@@ -34,14 +32,14 @@ import java.util.Map;
public class UnaryClientStream extends AbstractClientStream implements Stream {
-
protected UnaryClientStream(URL url) {
super(url);
}
@Override
- protected StreamObserver<Object> createStreamObserver() {
- return new UnaryClientStreamObserverImpl();
+ protected void doOnStartCall() {
+ asStreamObserver().onNext(getRpcInvocation());
+ asStreamObserver().onCompleted();
}
@Override
@@ -53,45 +51,40 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
@Override
public void doOnComplete() {
- execute(() -> {
- try {
- AppResponse result;
- if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
- final Object resp = deserializeResponse(getData());
- result = new AppResponse(resp);
- } else {
- result = new AppResponse();
- }
- Response response = new Response(getRequest().getId(), TripleConstant.TRI_VERSION);
- result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
- response.setResult(result);
- DefaultFuture2.received(getConnection(), response);
- } catch (Exception e) {
- final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(e)
- .withDescription("Failed to deserialize response");
- onError(status);
+ try {
+ AppResponse result;
+ if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
+ final Object resp = deserializeResponse(getData());
+ result = new AppResponse(resp);
+ } else {
+ result = new AppResponse();
}
- });
+ Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+ result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
+ response.setResult(result);
+ DefaultFuture2.received(getConnection(), response);
+ } catch (Exception e) {
+ final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withCause(e)
+ .withDescription("Failed to deserialize response");
+ onError(status);
+ }
}
@Override
protected void onError(GrpcStatus status) {
- // run in callback executor will truncate exception stack and avoid blocking netty's event loop
- execute(() -> {
- Response response = new Response(getRequest().getId(), TripleConstant.TRI_VERSION);
- response.setErrorMessage(status.description);
- final AppResponse result = new AppResponse();
- final Metadata trailers = getTrailers() == null ? getHeaders() : getTrailers();
- result.setException(getThrowable(trailers));
- result.setObjectAttachments(UnaryClientStream.this.parseMetadataToAttachmentMap(trailers));
- response.setResult(result);
- if (!result.hasException()) {
- final byte code = GrpcStatus.toDubboStatus(status.code);
- response.setStatus(code);
- }
- DefaultFuture2.received(getConnection(), response);
- });
+ Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+ response.setErrorMessage(status.description);
+ final AppResponse result = new AppResponse();
+ final Metadata trailers = getTrailers() == null ? getHeaders() : getTrailers();
+ result.setException(getThrowable(trailers));
+ result.setObjectAttachments(UnaryClientStream.this.parseMetadataToAttachmentMap(trailers));
+ response.setResult(result);
+ if (!result.hasException()) {
+ final byte code = GrpcStatus.toDubboStatus(status.code);
+ response.setStatus(code);
+ }
+ DefaultFuture2.received(getConnection(), response);
}
private Throwable getThrowable(Metadata metadata) {
@@ -123,26 +116,4 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
}
}
}
-
-
- private class UnaryClientStreamObserverImpl implements StreamObserver<Object> {
-
- @Override
- public void onNext(Object data) {
- RpcInvocation invocation = (RpcInvocation) data;
- final Metadata metadata = createRequestMeta(invocation);
- getTransportSubscriber().onMetadata(metadata, false);
- final byte[] bytes = encodeRequest(invocation);
- getTransportSubscriber().onData(bytes, false);
- }
-
- @Override
- public void onError(Throwable throwable) {
- }
-
- @Override
- public void onCompleted() {
- getTransportSubscriber().onComplete();
- }
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index fde24a0..8c3d98b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -19,21 +19,15 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
-import io.netty.handler.codec.http.HttpHeaderNames;
-
-import java.util.Map;
import java.util.concurrent.CompletionStage;
-import java.util.function.BiConsumer;
import java.util.function.Function;
-import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.rpcExceptionCodeToGrpc;
+import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.getStatus;
public class UnaryServerStream extends AbstractServerStream implements Stream {
@@ -60,7 +54,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
@Override
public void doOnComplete() {
if (getData() != null) {
- execute(this::invoke);
+ invoke();
} else {
onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withDescription("Missing request data"));
@@ -68,90 +62,37 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
}
public void invoke() {
-
- RpcInvocation invocation;
- try {
- invocation = buildInvocation(getHeaders());
- final Object[] arguments = deserializeRequest(getData());
- if (arguments != null) {
- invocation.setArguments(arguments);
- } else {
- return;
- }
- } catch (Throwable t) {
- LOGGER.warn("Exception processing triple message", t);
- transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Decode request failed:" + t.getMessage()));
+ RpcInvocation invocation = buildInvocation(getHeaders());
+ final Object[] arguments = deserializeRequest(getData());
+ if (arguments == null) {
return;
}
-
+ invocation.setArguments(arguments);
final Result result = getInvoker().invoke(invocation);
CompletionStage<Object> future = result.thenApply(Function.identity());
-
- BiConsumer<Object, Throwable> onComplete = (appResult, t) -> {
- if (t != null) {
- if (t instanceof TimeoutException) {
- transportError(GrpcStatus.fromCode(GrpcStatus.Code.DEADLINE_EXCEEDED).withCause(t));
- } else {
- transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN).withCause(t));
- }
+ future.whenComplete((o, throwable) -> {
+ if (throwable != null) {
+ LOGGER.error("Invoke error", throwable);
+ transportError(getStatus(throwable));
return;
}
- AppResponse response = (AppResponse) appResult;
- try {
- if (response.hasException()) {
- final Throwable exception = response.getException();
- if (exception instanceof RpcException) {
- transportError(rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
- .withCause(exception), response.getObjectAttachments());
- final GrpcStatus status = rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
- .withCause(exception);
- transportError(status, response.getObjectAttachments());
- } else {
- transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
- .withCause(exception), response.getObjectAttachments());
- }
- return;
- }
- Metadata metadata = createRequestMeta();
- metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
- getTransportSubscriber().onMetadata(metadata, false);
-
- final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
- final byte[] data;
- try {
- ClassLoadUtil.switchContextLoader(
- getProviderModel().getServiceInterfaceClass().getClassLoader());
- data = encodeResponse(response.getValue());
- } finally {
- ClassLoadUtil.switchContextLoader(tccl);
- }
- getTransportSubscriber().onData(data, false);
-
- Metadata trailers = new DefaultMetadata()
- .put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
- final Map<String, Object> attachments = response.getObjectAttachments();
- if (attachments != null) {
- convertAttachment(trailers, attachments);
- }
- getTransportSubscriber().onMetadata(trailers, true);
- } catch (Throwable e) {
- LOGGER.warn("Exception processing triple message", e);
- if (e instanceof RpcException) {
- final GrpcStatus status = rpcExceptionCodeToGrpc(((RpcException) e).getCode())
- .withCause(e);
- transportError(status, response.getObjectAttachments());
- } else {
- transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
- .withDescription("Exception occurred in provider's execution:" + e.getMessage())
- .withCause(e), response.getObjectAttachments());
- }
+ AppResponse response = (AppResponse) o;
+ if (response.hasException()) {
+ transportError(getStatus(response.getException()));
+ return;
}
- };
-
- future.whenComplete(onComplete);
+ Metadata metadata = createResponseMeta();
+ getTransportSubscriber().onMetadata(metadata, false);
+ final byte[] data = encodeResponse(response.getValue());
+ if (data == null) {
+ return;
+ }
+ getTransportSubscriber().onData(data, false);
+ Metadata trailers = TripleConstant.SUCCESS_RESPONSE_META;
+ convertAttachment(trailers, response.getObjectAttachments());
+ getTransportSubscriber().onMetadata(trailers, true);
+ });
RpcContext.removeContext();
}
}
-
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
new file mode 100644
index 0000000..ce6d7be
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TransportStateTest {
+
+ @Test
+ void allowSendMeta() {
+ TransportState transportState = new TransportState();
+ transportState.setMetaSend();
+ Assertions.assertFalse(transportState.allowSendMeta());
+
+ transportState = new TransportState();
+ transportState.setEndStreamSend();
+ Assertions.assertFalse(transportState.allowSendMeta());
+
+ transportState = new TransportState();
+ transportState.setResetSend();
+ Assertions.assertFalse(transportState.allowSendMeta());
+
+ transportState = new TransportState();
+ transportState.setEndStreamSend();
+ Assertions.assertFalse(transportState.allowSendMeta());
+
+ transportState = new TransportState();
+ transportState.setEndStreamSend();
+ transportState.setMetaSend();
+ Assertions.assertFalse(transportState.allowSendMeta());
+
+ transportState = new TransportState();
+ Assertions.assertTrue(transportState.allowSendMeta());
+ }
+
+ @Test
+ void allowSendData() {
+ TransportState transportState = new TransportState();
+ Assertions.assertFalse(transportState.allowSendData());
+
+ transportState = new TransportState();
+ transportState.setResetSend();
+ Assertions.assertFalse(transportState.allowSendData());
+
+ transportState = new TransportState();
+ transportState.setEndStreamSend();
+ Assertions.assertFalse(transportState.allowSendData());
+
+ transportState = new TransportState();
+ transportState.setMetaSend();
+ Assertions.assertTrue(transportState.allowSendData());
+ }
+
+ @Test
+ void allowSendEndStream() {
+ TransportState transportState = new TransportState();
+ Assertions.assertFalse(transportState.allowSendEndStream());
+
+ transportState = new TransportState();
+ transportState.setResetSend();
+ Assertions.assertFalse(transportState.allowSendEndStream());
+
+ transportState = new TransportState();
+ transportState.setEndStreamSend();
+ Assertions.assertFalse(transportState.allowSendEndStream());
+
+ transportState = new TransportState();
+ transportState.setMetaSend();
+ Assertions.assertTrue(transportState.allowSendEndStream());
+
+ }
+
+ @Test
+ void allowSendReset() {
+ TransportState transportState = new TransportState();
+ transportState.setResetSend();
+ Assertions.assertFalse(transportState.allowSendReset());
+
+ transportState = new TransportState();
+ Assertions.assertTrue(transportState.allowSendReset());
+
+ transportState = new TransportState();
+ transportState.setEndStreamSend();
+ Assertions.assertTrue(transportState.allowSendReset());
+
+ transportState = new TransportState();
+ transportState.setMetaSend();
+ Assertions.assertTrue(transportState.allowSendReset());
+ }
+
+ @Test
+ void serverEndStream() {
+ TransportState transportState = new TransportState();
+ Assertions.assertFalse(transportState.serverSendStreamReceived());
+
+ transportState.setServerEndStreamReceived();
+ Assertions.assertTrue(transportState.serverSendStreamReceived());
+
+ }
+}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 5910b3b..0cddc3c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -28,8 +28,6 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;
class UnaryClientStreamTest {
@@ -46,11 +44,10 @@ class UnaryClientStreamTest {
// no subscriber
Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
- TransportObserver transportObserver = Mockito.mock(TransportObserver.class);
+ AbstractChannelTransportObserver transportObserver = Mockito.mock(AbstractChannelTransportObserver.class);
stream.subscribe(transportObserver);
// no method descriptor
Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
- Mockito.verify(transportObserver).onMetadata(any(), anyBoolean());
MethodDescriptor md = Mockito.mock(MethodDescriptor.class);
when(md.isNeedWrap()).thenReturn(true);