You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/03/11 08:54:15 UTC

[dubbo] branch 3.0 updated: [Dubbo3-Triple] Add status and timeout support (#7361)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 0298ec0  [Dubbo3-Triple] Add status and timeout support (#7361)
0298ec0 is described below

commit 0298ec0fd3eacd991c00cd1c4f3239d7e573437d
Author: GuoHao <gu...@gmail.com>
AuthorDate: Thu Mar 11 16:53:47 2021 +0800

    [Dubbo3-Triple] Add status and timeout support (#7361)
    
    * Add grpc status to dubbo status convention
    
    * Add triple timeout support
---
 .../dubbo/rpc/protocol/tri/ClientStream.java       | 15 +++++--
 .../apache/dubbo/rpc/protocol/tri/GrpcStatus.java  | 50 +++++++++++++++++++++-
 .../dubbo/rpc/protocol/tri/ServerStream.java       | 11 ++---
 .../rpc/protocol/tri/TripleClientHandler.java      |  5 ---
 .../dubbo/rpc/protocol/tri/TripleConstant.java     |  1 +
 5 files changed, 67 insertions(+), 15 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 10ad523..cc78abc 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -90,8 +90,8 @@ public class ClientStream extends AbstractStream implements Stream {
         } else {
             response.setErrorMessage(status.cause.getMessage());
         }
-        // TODO map grpc status to response status
-        response.setStatus(Response.BAD_REQUEST);
+        final byte code = GrpcStatus.toDubboStatus(status.code);
+        response.setStatus(code);
         DefaultFuture2.received(Connection.getConnectionFromChannel(getCtx().channel()), response);
     }
 
@@ -106,6 +106,7 @@ public class ClientStream extends AbstractStream implements Stream {
                 .method(HttpMethod.POST.asciiName())
                 .path("/" + invocation.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + invocation.getMethodName())
                 .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
+                .set(TripleConstant.TIMEOUT, invocation.get(CommonConstants.TIMEOUT_KEY) +"m")
                 .set(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
 
         final String version = (String) invocation.getObjectAttachment(CommonConstants.VERSION_KEY);
@@ -129,6 +130,8 @@ public class ClientStream extends AbstractStream implements Stream {
         if (attachments != null) {
             convertAttachment(headers, attachments);
         }
+        headers.remove("path");
+        headers.remove("interface");
         DefaultHttp2HeadersFrame frame = new DefaultHttp2HeadersFrame(headers);
         final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
 
@@ -163,7 +166,13 @@ public class ClientStream extends AbstractStream implements Stream {
             ClassLoadUtil.switchContextLoader(tccl);
         }
         final DefaultHttp2DataFrame data = new DefaultHttp2DataFrame(out, true);
-        streamChannel.write(data);
+        streamChannel.write(data).addListener(f->{
+            if(f.isSuccess()){
+                promise.trySuccess();
+            }else{
+                promise.tryFailure(f.cause());
+            }
+        });
     }
 
     public void halfClose() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 3751740..a3edc1e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -16,12 +16,13 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
+import org.apache.dubbo.remoting.exchange.Response;
+
 /**
  * See https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
  */
 
 public class GrpcStatus {
-    public static final GrpcStatus OK = new GrpcStatus(Code.OK, null, "OK");
     public final Code code;
     public final Throwable cause;
     public final String description;
@@ -40,6 +41,41 @@ public class GrpcStatus {
         return new GrpcStatus(code, null, null);
     }
 
+    public static byte toDubboStatus(Code code) {
+        byte status;
+        switch (code) {
+            case OK:
+                status = Response.OK;
+                break;
+            case UNKNOWN:
+                status = Response.SERVICE_ERROR;
+                break;
+            case DEADLINE_EXCEEDED:
+                status = Response.SERVER_TIMEOUT;
+                break;
+            case RESOURCE_EXHAUSTED:
+                status = Response.SERVER_THREADPOOL_EXHAUSTED_ERROR;
+                break;
+            case UNIMPLEMENTED:
+                status = Response.SERVICE_NOT_FOUND;
+                break;
+            case INVALID_ARGUMENT:
+                status = Response.BAD_REQUEST;
+                break;
+            case INTERNAL:
+                status = Response.SERVER_ERROR;
+                break;
+            case UNAVAILABLE:
+            case DATA_LOSS:
+                status = Response.CHANNEL_INACTIVE;
+                break;
+            default:
+                status = Response.CLIENT_ERROR;
+                break;
+        }
+        return status;
+    }
+
     public GrpcStatus withCause(Throwable cause) {
         return new GrpcStatus(this.code, cause, this.description);
     }
@@ -54,12 +90,21 @@ public class GrpcStatus {
 
     enum Code {
         OK(0),
+        CANCELLED(1),
         UNKNOWN(2),
+        INVALID_ARGUMENT(3),
         DEADLINE_EXCEEDED(4),
         NOT_FOUND(5),
+        ALREADY_EXISTS(6),
+        PERMISSION_DENIED(7),
         RESOURCE_EXHAUSTED(8),
+        FAILED_PRECONDITION(9),
+        ABORTED(10),
+        OUT_OF_RANGE(11),
         UNIMPLEMENTED(12),
-        INTERNAL(13);
+        INTERNAL(13),
+        UNAVAILABLE(14),
+        DATA_LOSS(15);
 
         final int code;
 
@@ -71,6 +116,7 @@ public class GrpcStatus {
             return status == OK.code;
         }
 
+
         public static Code fromCode(int code) {
             for (Code value : Code.values()) {
                 if (value.code == code) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index a001364..6f0f831 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -279,15 +279,16 @@ public class ServerStream extends AbstractStream implements Stream {
         inv.setParameterTypes(methodDescriptor.getParameterClasses());
         inv.setReturnTypes(methodDescriptor.getReturnTypes());
         final Map<String, Object> attachments = parseHeadersToMap(getHeaders());
-        attachments.remove("content-type");
         attachments.remove("interface");
-        attachments.remove("tri-service-version");
-        attachments.remove("tri-service-group");
         attachments.remove("serialization");
         attachments.remove("te");
         attachments.remove("path");
-        attachments.remove("grpc-status");
-        attachments.remove("grpc-message");
+        attachments.remove(TripleConstant.CONTENT_TYPE_KEY);
+        attachments.remove(TripleConstant.SERVICE_GROUP);
+        attachments.remove(TripleConstant.SERVICE_VERSION);
+        attachments.remove(TripleConstant.MESSAGE_KEY);
+        attachments.remove(TripleConstant.STATUS_KEY);
+        attachments.remove(TripleConstant.TIMEOUT);
         inv.setObjectAttachments(attachments);
         return inv;
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 3254e64..0108d57 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -42,11 +42,6 @@ public class TripleClientHandler extends ChannelDuplexHandler {
     }
 
     @Override
-    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-        super.close(ctx, promise);
-    }
-
-    @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg instanceof Http2SettingsFrame) {
             // already handled
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 23ce833..1abe7e2 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 public interface TripleConstant {
     String STATUS_KEY = "grpc-status";
     String MESSAGE_KEY = "grpc-message";
+    String TIMEOUT = "grpc-timeout";
     String CONTENT_TYPE_KEY = "content-type";
     String CONTENT_PROTO = "application/grpc+proto";
     String APPLICATION_GRPC = "application/grpc";