You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/11/01 02:32:14 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] Add WriteQueue for Triple (#9160)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new ef2c0b3  [3.0-Triple] Add WriteQueue for Triple (#9160)
ef2c0b3 is described below

commit ef2c0b3fcc3c42e7afb28dbc7cfe5d78f824c5c5
Author: earthchen <yo...@duobei.com>
AuthorDate: Sun Oct 31 21:31:58 2021 -0500

    [3.0-Triple] Add WriteQueue for Triple (#9160)
    
    * Add WriteQueue
    
    * remove unused
    
    * fix rat
    
    * delay flush
    
    * add flush command
---
 .../rpc/protocol/tri/AbstractClientStream.java     |  6 +-
 .../tri/ClientOutboundTransportObserver.java       | 36 +++------
 .../protocol/tri/OutboundTransportObserver.java    | 13 ++--
 .../tri/ServerOutboundTransportObserver.java       | 51 ++++---------
 .../protocol/tri/TripleClientRequestHandler.java   |  5 +-
 .../protocol/tri/TripleCommandOutBoundHandler.java | 37 +++++++++
 .../tri/TripleHttp2FrameServerHandler.java         | 21 +++---
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  1 +
 .../apache/dubbo/rpc/protocol/tri/WriteQueue.java  | 87 ++++++++++++++++++++++
 .../protocol/tri/command/CancelQueueCommand.java   | 44 +++++++++++
 .../rpc/protocol/tri/command/DataQueueCommand.java | 84 +++++++++++++++++++++
 .../protocol/tri/command/FlushQueueCommand.java    | 29 ++++++++
 .../protocol/tri/command/HeaderQueueCommand.java   | 81 ++++++++++++++++++++
 .../rpc/protocol/tri/command/QueuedCommand.java    | 69 +++++++++++++++++
 .../protocol/tri/command/TextDataQueueCommand.java | 46 ++++++++++++
 15 files changed, 524 insertions(+), 86 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 2d75ca0..b4acdbc 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -40,7 +40,6 @@ import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
 import io.netty.util.AsciiString;
 
 import java.io.ByteArrayInputStream;
@@ -141,12 +140,11 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
     }
 
-    protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
+    protected void startCall(WriteQueue queue, ChannelPromise promise) {
         execute(() -> {
-            final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(channel, promise);
+            final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(queue, promise);
             subscribe(clientTransportObserver);
             try {
-                DefaultFuture2.addTimeoutListener(getRequestId(), channel::close);
                 doOnStartCall();
             } catch (Throwable throwable) {
                 cancel(throwable);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
index c939ae5..672802b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
@@ -17,15 +17,11 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.buffer.ByteBuf;
+import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+
 import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
-import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
 
 /**
  * Send stream data to remote
@@ -34,18 +30,15 @@ import io.netty.handler.codec.http2.Http2StreamChannel;
 public class ClientOutboundTransportObserver extends OutboundTransportObserver {
 
     private final ChannelPromise promise;
-    private final Http2StreamChannel streamChannel;
 
-    public ClientOutboundTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
-        this.streamChannel = channel;
+    public ClientOutboundTransportObserver(WriteQueue writeQueue, ChannelPromise promise) {
+        super(writeQueue);
         this.promise = promise;
     }
 
     @Override
     protected void doOnMetadata(Metadata metadata, boolean endStream) {
-        final Http2Headers headers = new DefaultHttp2Headers(true);
-        metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
-        streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
+        writeQueue.enqueue(HeaderQueueCommand.createHeaders(metadata, endStream), true)
             .addListener(future -> {
                 if (!future.isSuccess()) {
                     promise.tryFailure(future.cause());
@@ -55,11 +48,7 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
 
     @Override
     protected void doOnData(byte[] data, boolean endStream) {
-        ByteBuf buf = streamChannel.alloc().buffer();
-        buf.writeByte(getCompressFlag());
-        buf.writeInt(data.length);
-        buf.writeBytes(data);
-        streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
+        writeQueue.enqueue(DataQueueCommand.createGrpcCommand(data, endStream, true), true)
             .addListener(future -> {
                 if (!future.isSuccess()) {
                     promise.tryFailure(future.cause());
@@ -69,7 +58,7 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
 
     @Override
     protected void doOnError(GrpcStatus status) {
-        streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
+        writeQueue.enqueue(CancelQueueCommand.createCommand(status), true)
             .addListener(future -> {
                 if (future.isSuccess()) {
                     promise.trySuccess();
@@ -81,7 +70,7 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
 
     @Override
     protected void doOnComplete() {
-        streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
+        writeQueue.enqueue(DataQueueCommand.createGrpcCommand(true), true)
             .addListener(future -> {
                 if (future.isSuccess()) {
                     promise.trySuccess();
@@ -90,9 +79,4 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
                 }
             });
     }
-
-    private int getCompressFlag() {
-        AbstractClientStream stream = streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).get();
-        return calcCompressFlag(stream.getCompressor());
-    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
index 4194b4f..0a99ab5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
@@ -23,6 +23,11 @@ package org.apache.dubbo.rpc.protocol.tri;
 public abstract class OutboundTransportObserver implements TransportObserver {
 
     protected final TransportState state = new TransportState();
+    protected final WriteQueue writeQueue;
+
+    public OutboundTransportObserver(WriteQueue writeQueue) {
+        this.writeQueue = writeQueue;
+    }
 
     @Override
     public void onMetadata(Metadata metadata, boolean endStream) {
@@ -87,14 +92,6 @@ public abstract class OutboundTransportObserver implements TransportObserver {
 
     protected abstract void doOnComplete();
 
-
-    protected int calcCompressFlag(Compressor compressor) {
-        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
-            return 0;
-        }
-        return 1;
-    }
-
 }
 
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
index 4f74ec9..584be8d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
@@ -19,30 +19,24 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
-import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
-import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 
 public class ServerOutboundTransportObserver extends OutboundTransportObserver {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerOutboundTransportObserver.class);
 
-    private final ChannelHandlerContext ctx;
-
-    public ServerOutboundTransportObserver(ChannelHandlerContext ctx) {
-        this.ctx = ctx;
+    public ServerOutboundTransportObserver(WriteQueue queue) {
+        super(queue);
     }
 
     public void onMetadata(Http2Headers headers, boolean endStream) {
         checkSendMeta(headers, endStream);
-        ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
+        writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, endStream), true)
             .addListener(future -> {
                 if (!future.isSuccess()) {
                     LOGGER.warn("send header error endStream=" + endStream, future.cause());
@@ -69,16 +63,17 @@ public class ServerOutboundTransportObserver extends OutboundTransportObserver {
 
     @Override
     protected void doOnData(byte[] data, boolean endStream) {
-        ByteBuf buf = ctx.alloc().buffer();
-        buf.writeByte(getCompressFlag());
-        buf.writeInt(data.length);
-        buf.writeBytes(data);
-        onData(buf, endStream);
+        writeQueue.enqueue(DataQueueCommand.createGrpcCommand(data, endStream, false), true)
+            .addListener(future -> {
+                if (!future.isSuccess()) {
+                    LOGGER.warn("send data error endStream=" + endStream, future.cause());
+                }
+            });
     }
 
     @Override
     protected void doOnError(GrpcStatus status) {
-        ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
+        writeQueue.enqueue(CancelQueueCommand.createCommand(status), true)
             .addListener(future -> {
                 if (!future.isSuccess()) {
                     LOGGER.warn("write reset error", future.cause());
@@ -90,24 +85,4 @@ public class ServerOutboundTransportObserver extends OutboundTransportObserver {
     protected void doOnComplete() {
 
     }
-
-    public void onData(String str, boolean endStream) {
-        ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), str);
-        onData(buf, endStream);
-    }
-
-    public void onData(ByteBuf buf, boolean endStream) {
-        checkSendData(endStream);
-        ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
-            .addListener(future -> {
-                if (!future.isSuccess()) {
-                    LOGGER.warn("send data error endStream=" + endStream, future.cause());
-                }
-            });
-    }
-
-    private int getCompressFlag() {
-        AbstractServerStream stream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
-        return calcCompressFlag(stream.getCompressor());
-    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 7d11b30..187db26 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -50,12 +50,15 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
                 if (future.isSuccess()) {
                     final Http2StreamChannel channel = (Http2StreamChannel) future.get();
                     channel.pipeline()
+                        .addLast(new TripleCommandOutBoundHandler())
                         .addLast(new TripleHttp2ClientResponseHandler())
                         .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
                         .addLast(new TripleClientInboundHandler());
                     channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
+                    DefaultFuture2.addTimeoutListener(req.getId(), channel::close);
+                    WriteQueue writeQueue = new WriteQueue(channel);
                     // Start call only when the channel creation is successful
-                    stream.startCall(channel, promise);
+                    stream.startCall(writeQueue, promise);
                 } else {
                     promise.tryFailure(future.cause());
                     DefaultFuture2.getFuture(req.getId()).cancel();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCommandOutBoundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCommandOutBoundHandler.java
new file mode 100644
index 0000000..7a0f5ea
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCommandOutBoundHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class TripleCommandOutBoundHandler extends ChannelOutboundHandlerAdapter {
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        if (msg instanceof QueuedCommand.AbstractQueuedCommand) {
+            QueuedCommand.AbstractQueuedCommand command = (QueuedCommand.AbstractQueuedCommand) msg;
+            command.send(ctx, promise);
+        } else {
+            super.write(ctx, msg, promise);
+        }
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index ab232f7..fadcb3c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -27,6 +27,8 @@ import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.ProviderModel;
 import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
+import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.TextDataQueueCommand;
 import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
 
 import io.netty.channel.Channel;
@@ -125,24 +127,25 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
     public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
         final Http2Headers headers = msg.headers();
-        ServerOutboundTransportObserver transportObserver = new ServerOutboundTransportObserver(ctx);
+        WriteQueue writeQueue = new WriteQueue(ctx.channel());
+        ServerOutboundTransportObserver transportObserver = new ServerOutboundTransportObserver(writeQueue);
 
         if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
-            responsePlainTextError(transportObserver, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
+            responsePlainTextError(writeQueue, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
                 GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
                     .withDescription(String.format("Method '%s' is not supported", headers.method())));
             return;
         }
 
         if (headers.path() == null) {
-            responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
+            responsePlainTextError(writeQueue, HttpResponseStatus.NOT_FOUND.code(),
                 GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
             return;
         }
 
         final String path = headers.path().toString();
         if (path.charAt(0) != '/') {
-            responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
+            responsePlainTextError(writeQueue, HttpResponseStatus.NOT_FOUND.code(),
                 GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
                     .withDescription(String.format("Expected path to start with /: %s", path)));
             return;
@@ -150,7 +153,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
         final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
         if (contentType == null) {
-            responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+            responsePlainTextError(writeQueue, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
                 GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
                     .withDescription("Content-Type is missing from the request"));
             return;
@@ -158,7 +161,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
         final String contentString = contentType.toString();
         if (!supportContentType(contentString)) {
-            responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+            responsePlainTextError(writeQueue, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
                 GrpcStatus.fromCode(Code.INTERNAL.code)
                     .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
             return;
@@ -260,14 +263,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
     }
 
-    private void responsePlainTextError(ServerOutboundTransportObserver observer, int code, GrpcStatus status) {
+    private void responsePlainTextError(WriteQueue writeQueue, int code, GrpcStatus status) {
         Http2Headers headers = new DefaultHttp2Headers(true)
             .status(String.valueOf(code))
             .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
             .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
             .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.TEXT_PLAIN_UTF8);
-        observer.onMetadata(headers, false);
-        observer.onData(status.description, true);
+        writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, false), false);
+        writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description, true), true);
     }
 
     private void responseErr(ServerOutboundTransportObserver observer, GrpcStatus status) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index bb1567a..3c3c120 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -79,6 +79,7 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
             @Override
             protected void initChannel(Channel ch) {
                 final ChannelPipeline p = ch.pipeline();
+                p.addLast(new TripleCommandOutBoundHandler());
                 p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
                 // TODO constraint MAX DATA_SIZE
                 p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/WriteQueue.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/WriteQueue.java
new file mode 100644
index 0000000..46da6d89
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/WriteQueue.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class WriteQueue {
+
+    static final int DEQUE_CHUNK_SIZE = 128;
+    private final Channel channel;
+    private final Queue<QueuedCommand> queue;
+    private final AtomicBoolean scheduled;
+
+    public WriteQueue(Channel channel) {
+        this.channel = channel;
+        queue = new ConcurrentLinkedQueue<>();
+        scheduled = new AtomicBoolean(false);
+    }
+
+    public ChannelPromise enqueue(QueuedCommand command, boolean flush) {
+        ChannelPromise promise = command.promise();
+        if (promise == null) {
+            promise = channel.newPromise();
+            command.promise(promise);
+        }
+        queue.add(command);
+        if (flush) {
+            scheduleFlush();
+        }
+        return promise;
+    }
+
+    public void scheduleFlush() {
+        if (scheduled.compareAndSet(false, true)) {
+            channel.eventLoop().execute(this::flush);
+        }
+    }
+
+    private void flush() {
+        try {
+            QueuedCommand cmd;
+            int i = 0;
+            boolean flushedOnce = false;
+            while ((cmd = queue.poll()) != null) {
+                cmd.run(channel);
+                i++;
+                if (i == DEQUE_CHUNK_SIZE) {
+                    i = 0;
+                    cmd.setFlush(true);
+                    channel.flush();
+                    flushedOnce = true;
+                }
+            }
+            if (i != 0 || !flushedOnce) {
+                channel.flush();
+            }
+        } finally {
+            scheduled.set(false);
+            if (!queue.isEmpty()) {
+                scheduleFlush();
+            }
+        }
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/CancelQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/CancelQueueCommand.java
new file mode 100644
index 0000000..b7d89f7
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/CancelQueueCommand.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
+import io.netty.handler.codec.http2.Http2Error;
+
+public class CancelQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+    private final GrpcStatus status;
+
+    private CancelQueueCommand(GrpcStatus status) {
+        this.status = status;
+    }
+
+    public static CancelQueueCommand createCommand(GrpcStatus status) {
+        return new CancelQueueCommand(status);
+    }
+
+
+    @Override
+    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+        ctx.write(new DefaultHttp2ResetFrame(Http2Error.CANCEL), promise);
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java
new file mode 100644
index 0000000..bc98842
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.AbstractStream;
+import org.apache.dubbo.rpc.protocol.tri.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.IdentityCompressor;
+import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+
+public class DataQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+    private final byte[] data;
+
+    private final boolean endStream;
+
+    private final boolean client;
+
+    private DataQueueCommand(byte[] data, boolean endStream, boolean client) {
+        this.data = data;
+        this.endStream = endStream;
+        this.client = client;
+    }
+
+    private DataQueueCommand(boolean endStream, boolean client) {
+        this(null, endStream, client);
+    }
+
+    private DataQueueCommand(boolean endStream) {
+        this(null, endStream, false);
+    }
+
+    public static DataQueueCommand createGrpcCommand(byte[] data, boolean endStream, boolean client) {
+        return new DataQueueCommand(data, endStream, client);
+    }
+
+    public static DataQueueCommand createGrpcCommand(boolean endStream) {
+        return new DataQueueCommand(endStream);
+    }
+
+    @Override
+    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+        if (data == null) {
+            ctx.write(new DefaultHttp2DataFrame(endStream), promise);
+        } else {
+            ByteBuf buf = ctx.alloc().buffer();
+            buf.writeByte(getCompressFlag(ctx));
+            buf.writeInt(data.length);
+            buf.writeBytes(data);
+            ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
+        }
+    }
+
+    private int getCompressFlag(ChannelHandlerContext ctx) {
+        AbstractStream stream = client ? ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get() : ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+        return calcCompressFlag(stream.getCompressor());
+    }
+
+    protected int calcCompressFlag(Compressor compressor) {
+        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+            return 0;
+        }
+        return 1;
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/FlushQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/FlushQueueCommand.java
new file mode 100644
index 0000000..9563056
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/FlushQueueCommand.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+public class FlushQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+    @Override
+    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+        ctx.flush();
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/HeaderQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/HeaderQueueCommand.java
new file mode 100644
index 0000000..4936a28
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/HeaderQueueCommand.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.Metadata;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2Headers;
+
+public class HeaderQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+    private final Http2Headers headers;
+
+    private final boolean endStream;
+
+    private HeaderQueueCommand(Metadata headers, boolean endStream) {
+        this(getHttp2Headers(headers), endStream);
+    }
+
+    private HeaderQueueCommand(Http2Headers headers, boolean endStream) {
+        this.headers = headers;
+        this.endStream = endStream;
+    }
+
+    public static HeaderQueueCommand createHeaders(Metadata headers, boolean endStream) {
+        return new HeaderQueueCommand(getHttp2Headers(headers), endStream);
+    }
+
+    public static HeaderQueueCommand createHeaders(Metadata headers) {
+        return new HeaderQueueCommand(headers, false);
+    }
+
+    public static HeaderQueueCommand createHeaders(Http2Headers headers) {
+        return new HeaderQueueCommand(headers, false);
+    }
+
+    public static HeaderQueueCommand createHeaders(Http2Headers headers, boolean endStream) {
+        return new HeaderQueueCommand(headers, endStream);
+    }
+
+    public static HeaderQueueCommand createTrailers(Metadata headers) {
+        return new HeaderQueueCommand(headers, true);
+    }
+
+    public Http2Headers getHeaders() {
+        return headers;
+    }
+
+    public boolean isEndStream() {
+        return endStream;
+    }
+
+    private static Http2Headers getHttp2Headers(Metadata metadata) {
+        Http2Headers http2Headers = new DefaultHttp2Headers(true);
+        metadata.forEach((kv) -> http2Headers.set(kv.getKey(), kv.getValue()));
+        return http2Headers;
+    }
+
+    @Override
+    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+        ctx.write(new DefaultHttp2HeadersFrame(headers, endStream), promise);
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
new file mode 100644
index 0000000..c06ecfa
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+public interface QueuedCommand {
+
+    void setFlush(boolean flush);
+
+    ChannelPromise promise();
+
+    void promise(ChannelPromise promise);
+
+    void run(Channel channel);
+
+    abstract class AbstractQueuedCommand implements QueuedCommand {
+
+        private ChannelPromise promise;
+
+        protected boolean flush = false;
+
+        @Override
+        public ChannelPromise promise() {
+            return promise;
+        }
+
+        public void setFlush(boolean flush) {
+            this.flush = flush;
+        }
+
+        @Override
+        public void promise(ChannelPromise promise) {
+            this.promise = promise;
+        }
+
+        @Override
+        public void run(Channel channel) {
+            channel.write(this, promise);
+        }
+
+        public final void send(ChannelHandlerContext ctx, ChannelPromise promise) {
+            doSend(ctx, promise);
+            if (flush) {
+                ctx.flush();
+            }
+        }
+
+        public abstract void doSend(ChannelHandlerContext ctx, ChannelPromise promise);
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/TextDataQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/TextDataQueueCommand.java
new file mode 100644
index 0000000..547c747
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/TextDataQueueCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+
+public class TextDataQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+    private final String data;
+
+    private final boolean endStream;
+
+    private TextDataQueueCommand(String text, boolean endStream) {
+        this.data = text;
+        this.endStream = endStream;
+    }
+
+    public static TextDataQueueCommand createCommand(String data, boolean endStream) {
+        return new TextDataQueueCommand(data, endStream);
+    }
+
+    @Override
+    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+        ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), data);
+        ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
+    }
+}