You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/03/11 08:54:15 UTC
[dubbo] branch 3.0 updated: [Dubbo3-Triple] Add status and timeout
support (#7361)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 0298ec0 [Dubbo3-Triple] Add status and timeout support (#7361)
0298ec0 is described below
commit 0298ec0fd3eacd991c00cd1c4f3239d7e573437d
Author: GuoHao <gu...@gmail.com>
AuthorDate: Thu Mar 11 16:53:47 2021 +0800
[Dubbo3-Triple] Add status and timeout support (#7361)
* Add grpc status to dubbo status convention
* Add triple timeout support
---
.../dubbo/rpc/protocol/tri/ClientStream.java | 15 +++++--
.../apache/dubbo/rpc/protocol/tri/GrpcStatus.java | 50 +++++++++++++++++++++-
.../dubbo/rpc/protocol/tri/ServerStream.java | 11 ++---
.../rpc/protocol/tri/TripleClientHandler.java | 5 ---
.../dubbo/rpc/protocol/tri/TripleConstant.java | 1 +
5 files changed, 67 insertions(+), 15 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 10ad523..cc78abc 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -90,8 +90,8 @@ public class ClientStream extends AbstractStream implements Stream {
} else {
response.setErrorMessage(status.cause.getMessage());
}
- // TODO map grpc status to response status
- response.setStatus(Response.BAD_REQUEST);
+ final byte code = GrpcStatus.toDubboStatus(status.code);
+ response.setStatus(code);
DefaultFuture2.received(Connection.getConnectionFromChannel(getCtx().channel()), response);
}
@@ -106,6 +106,7 @@ public class ClientStream extends AbstractStream implements Stream {
.method(HttpMethod.POST.asciiName())
.path("/" + invocation.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + invocation.getMethodName())
.set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
+ .set(TripleConstant.TIMEOUT, invocation.get(CommonConstants.TIMEOUT_KEY) +"m")
.set(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
final String version = (String) invocation.getObjectAttachment(CommonConstants.VERSION_KEY);
@@ -129,6 +130,8 @@ public class ClientStream extends AbstractStream implements Stream {
if (attachments != null) {
convertAttachment(headers, attachments);
}
+ headers.remove("path");
+ headers.remove("interface");
DefaultHttp2HeadersFrame frame = new DefaultHttp2HeadersFrame(headers);
final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
@@ -163,7 +166,13 @@ public class ClientStream extends AbstractStream implements Stream {
ClassLoadUtil.switchContextLoader(tccl);
}
final DefaultHttp2DataFrame data = new DefaultHttp2DataFrame(out, true);
- streamChannel.write(data);
+ streamChannel.write(data).addListener(f->{
+ if(f.isSuccess()){
+ promise.trySuccess();
+ }else{
+ promise.tryFailure(f.cause());
+ }
+ });
}
public void halfClose() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 3751740..a3edc1e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -16,12 +16,13 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.remoting.exchange.Response;
+
/**
* See https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
*/
public class GrpcStatus {
- public static final GrpcStatus OK = new GrpcStatus(Code.OK, null, "OK");
public final Code code;
public final Throwable cause;
public final String description;
@@ -40,6 +41,41 @@ public class GrpcStatus {
return new GrpcStatus(code, null, null);
}
+ public static byte toDubboStatus(Code code) {
+ byte status;
+ switch (code) {
+ case OK:
+ status = Response.OK;
+ break;
+ case UNKNOWN:
+ status = Response.SERVICE_ERROR;
+ break;
+ case DEADLINE_EXCEEDED:
+ status = Response.SERVER_TIMEOUT;
+ break;
+ case RESOURCE_EXHAUSTED:
+ status = Response.SERVER_THREADPOOL_EXHAUSTED_ERROR;
+ break;
+ case UNIMPLEMENTED:
+ status = Response.SERVICE_NOT_FOUND;
+ break;
+ case INVALID_ARGUMENT:
+ status = Response.BAD_REQUEST;
+ break;
+ case INTERNAL:
+ status = Response.SERVER_ERROR;
+ break;
+ case UNAVAILABLE:
+ case DATA_LOSS:
+ status = Response.CHANNEL_INACTIVE;
+ break;
+ default:
+ status = Response.CLIENT_ERROR;
+ break;
+ }
+ return status;
+ }
+
public GrpcStatus withCause(Throwable cause) {
return new GrpcStatus(this.code, cause, this.description);
}
@@ -54,12 +90,21 @@ public class GrpcStatus {
enum Code {
OK(0),
+ CANCELLED(1),
UNKNOWN(2),
+ INVALID_ARGUMENT(3),
DEADLINE_EXCEEDED(4),
NOT_FOUND(5),
+ ALREADY_EXISTS(6),
+ PERMISSION_DENIED(7),
RESOURCE_EXHAUSTED(8),
+ FAILED_PRECONDITION(9),
+ ABORTED(10),
+ OUT_OF_RANGE(11),
UNIMPLEMENTED(12),
- INTERNAL(13);
+ INTERNAL(13),
+ UNAVAILABLE(14),
+ DATA_LOSS(15);
final int code;
@@ -71,6 +116,7 @@ public class GrpcStatus {
return status == OK.code;
}
+
public static Code fromCode(int code) {
for (Code value : Code.values()) {
if (value.code == code) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index a001364..6f0f831 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -279,15 +279,16 @@ public class ServerStream extends AbstractStream implements Stream {
inv.setParameterTypes(methodDescriptor.getParameterClasses());
inv.setReturnTypes(methodDescriptor.getReturnTypes());
final Map<String, Object> attachments = parseHeadersToMap(getHeaders());
- attachments.remove("content-type");
attachments.remove("interface");
- attachments.remove("tri-service-version");
- attachments.remove("tri-service-group");
attachments.remove("serialization");
attachments.remove("te");
attachments.remove("path");
- attachments.remove("grpc-status");
- attachments.remove("grpc-message");
+ attachments.remove(TripleConstant.CONTENT_TYPE_KEY);
+ attachments.remove(TripleConstant.SERVICE_GROUP);
+ attachments.remove(TripleConstant.SERVICE_VERSION);
+ attachments.remove(TripleConstant.MESSAGE_KEY);
+ attachments.remove(TripleConstant.STATUS_KEY);
+ attachments.remove(TripleConstant.TIMEOUT);
inv.setObjectAttachments(attachments);
return inv;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 3254e64..0108d57 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -42,11 +42,6 @@ public class TripleClientHandler extends ChannelDuplexHandler {
}
@Override
- public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
- super.close(ctx, promise);
- }
-
- @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2SettingsFrame) {
// already handled
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 23ce833..1abe7e2 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
public interface TripleConstant {
String STATUS_KEY = "grpc-status";
String MESSAGE_KEY = "grpc-message";
+ String TIMEOUT = "grpc-timeout";
String CONTENT_TYPE_KEY = "content-type";
String CONTENT_PROTO = "application/grpc+proto";
String APPLICATION_GRPC = "application/grpc";