You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/11/01 02:32:14 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] Add WriteQueue for Triple
(#9160)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new ef2c0b3 [3.0-Triple] Add WriteQueue for Triple (#9160)
ef2c0b3 is described below
commit ef2c0b3fcc3c42e7afb28dbc7cfe5d78f824c5c5
Author: earthchen <yo...@duobei.com>
AuthorDate: Sun Oct 31 21:31:58 2021 -0500
[3.0-Triple] Add WriteQueue for Triple (#9160)
* Add WriteQueue
* remove unused
* fix rat
* delay flush
* add flush command
---
.../rpc/protocol/tri/AbstractClientStream.java | 6 +-
.../tri/ClientOutboundTransportObserver.java | 36 +++------
.../protocol/tri/OutboundTransportObserver.java | 13 ++--
.../tri/ServerOutboundTransportObserver.java | 51 ++++---------
.../protocol/tri/TripleClientRequestHandler.java | 5 +-
.../protocol/tri/TripleCommandOutBoundHandler.java | 37 +++++++++
.../tri/TripleHttp2FrameServerHandler.java | 21 +++---
.../rpc/protocol/tri/TripleHttp2Protocol.java | 1 +
.../apache/dubbo/rpc/protocol/tri/WriteQueue.java | 87 ++++++++++++++++++++++
.../protocol/tri/command/CancelQueueCommand.java | 44 +++++++++++
.../rpc/protocol/tri/command/DataQueueCommand.java | 84 +++++++++++++++++++++
.../protocol/tri/command/FlushQueueCommand.java | 29 ++++++++
.../protocol/tri/command/HeaderQueueCommand.java | 81 ++++++++++++++++++++
.../rpc/protocol/tri/command/QueuedCommand.java | 69 +++++++++++++++++
.../protocol/tri/command/TextDataQueueCommand.java | 46 ++++++++++++
15 files changed, 524 insertions(+), 86 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 2d75ca0..b4acdbc 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -40,7 +40,6 @@ import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.util.AsciiString;
import java.io.ByteArrayInputStream;
@@ -141,12 +140,11 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
}
- protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
+ protected void startCall(WriteQueue queue, ChannelPromise promise) {
execute(() -> {
- final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(channel, promise);
+ final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(queue, promise);
subscribe(clientTransportObserver);
try {
- DefaultFuture2.addTimeoutListener(getRequestId(), channel::close);
doOnStartCall();
} catch (Throwable throwable) {
cancel(throwable);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
index c939ae5..672802b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
@@ -17,15 +17,11 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.buffer.ByteBuf;
+import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+
import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
-import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
/**
* Send stream data to remote
@@ -34,18 +30,15 @@ import io.netty.handler.codec.http2.Http2StreamChannel;
public class ClientOutboundTransportObserver extends OutboundTransportObserver {
private final ChannelPromise promise;
- private final Http2StreamChannel streamChannel;
- public ClientOutboundTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
- this.streamChannel = channel;
+ public ClientOutboundTransportObserver(WriteQueue writeQueue, ChannelPromise promise) {
+ super(writeQueue);
this.promise = promise;
}
@Override
protected void doOnMetadata(Metadata metadata, boolean endStream) {
- final Http2Headers headers = new DefaultHttp2Headers(true);
- metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
- streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
+ writeQueue.enqueue(HeaderQueueCommand.createHeaders(metadata, endStream), true)
.addListener(future -> {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
@@ -55,11 +48,7 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
@Override
protected void doOnData(byte[] data, boolean endStream) {
- ByteBuf buf = streamChannel.alloc().buffer();
- buf.writeByte(getCompressFlag());
- buf.writeInt(data.length);
- buf.writeBytes(data);
- streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
+ writeQueue.enqueue(DataQueueCommand.createGrpcCommand(data, endStream, true), true)
.addListener(future -> {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
@@ -69,7 +58,7 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
@Override
protected void doOnError(GrpcStatus status) {
- streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
+ writeQueue.enqueue(CancelQueueCommand.createCommand(status), true)
.addListener(future -> {
if (future.isSuccess()) {
promise.trySuccess();
@@ -81,7 +70,7 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
@Override
protected void doOnComplete() {
- streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
+ writeQueue.enqueue(DataQueueCommand.createGrpcCommand(true), true)
.addListener(future -> {
if (future.isSuccess()) {
promise.trySuccess();
@@ -90,9 +79,4 @@ public class ClientOutboundTransportObserver extends OutboundTransportObserver {
}
});
}
-
- private int getCompressFlag() {
- AbstractClientStream stream = streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).get();
- return calcCompressFlag(stream.getCompressor());
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
index 4194b4f..0a99ab5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
@@ -23,6 +23,11 @@ package org.apache.dubbo.rpc.protocol.tri;
public abstract class OutboundTransportObserver implements TransportObserver {
protected final TransportState state = new TransportState();
+ protected final WriteQueue writeQueue;
+
+ public OutboundTransportObserver(WriteQueue writeQueue) {
+ this.writeQueue = writeQueue;
+ }
@Override
public void onMetadata(Metadata metadata, boolean endStream) {
@@ -87,14 +92,6 @@ public abstract class OutboundTransportObserver implements TransportObserver {
protected abstract void doOnComplete();
-
- protected int calcCompressFlag(Compressor compressor) {
- if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
- return 0;
- }
- return 1;
- }
-
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
index 4f74ec9..584be8d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
@@ -19,30 +19,24 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
-import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
-import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
public class ServerOutboundTransportObserver extends OutboundTransportObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerOutboundTransportObserver.class);
- private final ChannelHandlerContext ctx;
-
- public ServerOutboundTransportObserver(ChannelHandlerContext ctx) {
- this.ctx = ctx;
+ public ServerOutboundTransportObserver(WriteQueue queue) {
+ super(queue);
}
public void onMetadata(Http2Headers headers, boolean endStream) {
checkSendMeta(headers, endStream);
- ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
+ writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, endStream), true)
.addListener(future -> {
if (!future.isSuccess()) {
LOGGER.warn("send header error endStream=" + endStream, future.cause());
@@ -69,16 +63,17 @@ public class ServerOutboundTransportObserver extends OutboundTransportObserver {
@Override
protected void doOnData(byte[] data, boolean endStream) {
- ByteBuf buf = ctx.alloc().buffer();
- buf.writeByte(getCompressFlag());
- buf.writeInt(data.length);
- buf.writeBytes(data);
- onData(buf, endStream);
+ writeQueue.enqueue(DataQueueCommand.createGrpcCommand(data, endStream, false), true)
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOGGER.warn("send data error endStream=" + endStream, future.cause());
+ }
+ });
}
@Override
protected void doOnError(GrpcStatus status) {
- ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
+ writeQueue.enqueue(CancelQueueCommand.createCommand(status), true)
.addListener(future -> {
if (!future.isSuccess()) {
LOGGER.warn("write reset error", future.cause());
@@ -90,24 +85,4 @@ public class ServerOutboundTransportObserver extends OutboundTransportObserver {
protected void doOnComplete() {
}
-
- public void onData(String str, boolean endStream) {
- ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), str);
- onData(buf, endStream);
- }
-
- public void onData(ByteBuf buf, boolean endStream) {
- checkSendData(endStream);
- ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
- .addListener(future -> {
- if (!future.isSuccess()) {
- LOGGER.warn("send data error endStream=" + endStream, future.cause());
- }
- });
- }
-
- private int getCompressFlag() {
- AbstractServerStream stream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
- return calcCompressFlag(stream.getCompressor());
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 7d11b30..187db26 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -50,12 +50,15 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
if (future.isSuccess()) {
final Http2StreamChannel channel = (Http2StreamChannel) future.get();
channel.pipeline()
+ .addLast(new TripleCommandOutBoundHandler())
.addLast(new TripleHttp2ClientResponseHandler())
.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
.addLast(new TripleClientInboundHandler());
channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
+ DefaultFuture2.addTimeoutListener(req.getId(), channel::close);
+ WriteQueue writeQueue = new WriteQueue(channel);
// Start call only when the channel creation is successful
- stream.startCall(channel, promise);
+ stream.startCall(writeQueue, promise);
} else {
promise.tryFailure(future.cause());
DefaultFuture2.getFuture(req.getId()).cancel();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCommandOutBoundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCommandOutBoundHandler.java
new file mode 100644
index 0000000..7a0f5ea
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCommandOutBoundHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class TripleCommandOutBoundHandler extends ChannelOutboundHandlerAdapter {
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ if (msg instanceof QueuedCommand.AbstractQueuedCommand) {
+ QueuedCommand.AbstractQueuedCommand command = (QueuedCommand.AbstractQueuedCommand) msg;
+ command.send(ctx, promise);
+ } else {
+ super.write(ctx, msg, promise);
+ }
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index ab232f7..fadcb3c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -27,6 +27,8 @@ import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
+import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.TextDataQueueCommand;
import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
import io.netty.channel.Channel;
@@ -125,24 +127,25 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
final Http2Headers headers = msg.headers();
- ServerOutboundTransportObserver transportObserver = new ServerOutboundTransportObserver(ctx);
+ WriteQueue writeQueue = new WriteQueue(ctx.channel());
+ ServerOutboundTransportObserver transportObserver = new ServerOutboundTransportObserver(writeQueue);
if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
- responsePlainTextError(transportObserver, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
+ responsePlainTextError(writeQueue, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withDescription(String.format("Method '%s' is not supported", headers.method())));
return;
}
if (headers.path() == null) {
- responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
+ responsePlainTextError(writeQueue, HttpResponseStatus.NOT_FOUND.code(),
GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
return;
}
final String path = headers.path().toString();
if (path.charAt(0) != '/') {
- responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
+ responsePlainTextError(writeQueue, HttpResponseStatus.NOT_FOUND.code(),
GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
.withDescription(String.format("Expected path to start with /: %s", path)));
return;
@@ -150,7 +153,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
if (contentType == null) {
- responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+ responsePlainTextError(writeQueue, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
.withDescription("Content-Type is missing from the request"));
return;
@@ -158,7 +161,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final String contentString = contentType.toString();
if (!supportContentType(contentString)) {
- responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+ responsePlainTextError(writeQueue, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
GrpcStatus.fromCode(Code.INTERNAL.code)
.withDescription(String.format("Content-Type '%s' is not supported", contentString)));
return;
@@ -260,14 +263,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
}
- private void responsePlainTextError(ServerOutboundTransportObserver observer, int code, GrpcStatus status) {
+ private void responsePlainTextError(WriteQueue writeQueue, int code, GrpcStatus status) {
Http2Headers headers = new DefaultHttp2Headers(true)
.status(String.valueOf(code))
.setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
.set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.TEXT_PLAIN_UTF8);
- observer.onMetadata(headers, false);
- observer.onData(status.description, true);
+ writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, false), false);
+ writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description, true), true);
}
private void responseErr(ServerOutboundTransportObserver observer, GrpcStatus status) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index bb1567a..3c3c120 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -79,6 +79,7 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
@Override
protected void initChannel(Channel ch) {
final ChannelPipeline p = ch.pipeline();
+ p.addLast(new TripleCommandOutBoundHandler());
p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
// TODO constraint MAX DATA_SIZE
p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/WriteQueue.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/WriteQueue.java
new file mode 100644
index 0000000..46da6d89
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/WriteQueue.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class WriteQueue {
+
+ static final int DEQUE_CHUNK_SIZE = 128;
+ private final Channel channel;
+ private final Queue<QueuedCommand> queue;
+ private final AtomicBoolean scheduled;
+
+ public WriteQueue(Channel channel) {
+ this.channel = channel;
+ queue = new ConcurrentLinkedQueue<>();
+ scheduled = new AtomicBoolean(false);
+ }
+
+ public ChannelPromise enqueue(QueuedCommand command, boolean flush) {
+ ChannelPromise promise = command.promise();
+ if (promise == null) {
+ promise = channel.newPromise();
+ command.promise(promise);
+ }
+ queue.add(command);
+ if (flush) {
+ scheduleFlush();
+ }
+ return promise;
+ }
+
+ public void scheduleFlush() {
+ if (scheduled.compareAndSet(false, true)) {
+ channel.eventLoop().execute(this::flush);
+ }
+ }
+
+ private void flush() {
+ try {
+ QueuedCommand cmd;
+ int i = 0;
+ boolean flushedOnce = false;
+ while ((cmd = queue.poll()) != null) {
+ cmd.run(channel);
+ i++;
+ if (i == DEQUE_CHUNK_SIZE) {
+ i = 0;
+ cmd.setFlush(true);
+ channel.flush();
+ flushedOnce = true;
+ }
+ }
+ if (i != 0 || !flushedOnce) {
+ channel.flush();
+ }
+ } finally {
+ scheduled.set(false);
+ if (!queue.isEmpty()) {
+ scheduleFlush();
+ }
+ }
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/CancelQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/CancelQueueCommand.java
new file mode 100644
index 0000000..b7d89f7
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/CancelQueueCommand.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
+import io.netty.handler.codec.http2.Http2Error;
+
+public class CancelQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+ private final GrpcStatus status;
+
+ private CancelQueueCommand(GrpcStatus status) {
+ this.status = status;
+ }
+
+ public static CancelQueueCommand createCommand(GrpcStatus status) {
+ return new CancelQueueCommand(status);
+ }
+
+
+ @Override
+ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+ ctx.write(new DefaultHttp2ResetFrame(Http2Error.CANCEL), promise);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java
new file mode 100644
index 0000000..bc98842
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.AbstractStream;
+import org.apache.dubbo.rpc.protocol.tri.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.IdentityCompressor;
+import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+
+public class DataQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+ private final byte[] data;
+
+ private final boolean endStream;
+
+ private final boolean client;
+
+ private DataQueueCommand(byte[] data, boolean endStream, boolean client) {
+ this.data = data;
+ this.endStream = endStream;
+ this.client = client;
+ }
+
+ private DataQueueCommand(boolean endStream, boolean client) {
+ this(null, endStream, client);
+ }
+
+ private DataQueueCommand(boolean endStream) {
+ this(null, endStream, false);
+ }
+
+ public static DataQueueCommand createGrpcCommand(byte[] data, boolean endStream, boolean client) {
+ return new DataQueueCommand(data, endStream, client);
+ }
+
+ public static DataQueueCommand createGrpcCommand(boolean endStream) {
+ return new DataQueueCommand(endStream);
+ }
+
+ @Override
+ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+ if (data == null) {
+ ctx.write(new DefaultHttp2DataFrame(endStream), promise);
+ } else {
+ ByteBuf buf = ctx.alloc().buffer();
+ buf.writeByte(getCompressFlag(ctx));
+ buf.writeInt(data.length);
+ buf.writeBytes(data);
+ ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
+ }
+ }
+
+ private int getCompressFlag(ChannelHandlerContext ctx) {
+ AbstractStream stream = client ? ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get() : ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+ return calcCompressFlag(stream.getCompressor());
+ }
+
+ protected int calcCompressFlag(Compressor compressor) {
+ if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+ return 0;
+ }
+ return 1;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/FlushQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/FlushQueueCommand.java
new file mode 100644
index 0000000..9563056
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/FlushQueueCommand.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+public class FlushQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+ @Override
+ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+ ctx.flush();
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/HeaderQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/HeaderQueueCommand.java
new file mode 100644
index 0000000..4936a28
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/HeaderQueueCommand.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.Metadata;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2Headers;
+
+public class HeaderQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+ private final Http2Headers headers;
+
+ private final boolean endStream;
+
+ private HeaderQueueCommand(Metadata headers, boolean endStream) {
+ this(getHttp2Headers(headers), endStream);
+ }
+
+ private HeaderQueueCommand(Http2Headers headers, boolean endStream) {
+ this.headers = headers;
+ this.endStream = endStream;
+ }
+
+ public static HeaderQueueCommand createHeaders(Metadata headers, boolean endStream) {
+ return new HeaderQueueCommand(getHttp2Headers(headers), endStream);
+ }
+
+ public static HeaderQueueCommand createHeaders(Metadata headers) {
+ return new HeaderQueueCommand(headers, false);
+ }
+
+ public static HeaderQueueCommand createHeaders(Http2Headers headers) {
+ return new HeaderQueueCommand(headers, false);
+ }
+
+ public static HeaderQueueCommand createHeaders(Http2Headers headers, boolean endStream) {
+ return new HeaderQueueCommand(headers, endStream);
+ }
+
+ public static HeaderQueueCommand createTrailers(Metadata headers) {
+ return new HeaderQueueCommand(headers, true);
+ }
+
+ public Http2Headers getHeaders() {
+ return headers;
+ }
+
+ public boolean isEndStream() {
+ return endStream;
+ }
+
+ private static Http2Headers getHttp2Headers(Metadata metadata) {
+ Http2Headers http2Headers = new DefaultHttp2Headers(true);
+ metadata.forEach((kv) -> http2Headers.set(kv.getKey(), kv.getValue()));
+ return http2Headers;
+ }
+
+ @Override
+ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+ ctx.write(new DefaultHttp2HeadersFrame(headers, endStream), promise);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
new file mode 100644
index 0000000..c06ecfa
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+public interface QueuedCommand {
+
+ void setFlush(boolean flush);
+
+ ChannelPromise promise();
+
+ void promise(ChannelPromise promise);
+
+ void run(Channel channel);
+
+ abstract class AbstractQueuedCommand implements QueuedCommand {
+
+ private ChannelPromise promise;
+
+ protected boolean flush = false;
+
+ @Override
+ public ChannelPromise promise() {
+ return promise;
+ }
+
+ public void setFlush(boolean flush) {
+ this.flush = flush;
+ }
+
+ @Override
+ public void promise(ChannelPromise promise) {
+ this.promise = promise;
+ }
+
+ @Override
+ public void run(Channel channel) {
+ channel.write(this, promise);
+ }
+
+ public final void send(ChannelHandlerContext ctx, ChannelPromise promise) {
+ doSend(ctx, promise);
+ if (flush) {
+ ctx.flush();
+ }
+ }
+
+ public abstract void doSend(ChannelHandlerContext ctx, ChannelPromise promise);
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/TextDataQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/TextDataQueueCommand.java
new file mode 100644
index 0000000..547c747
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/TextDataQueueCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+
+public class TextDataQueueCommand extends QueuedCommand.AbstractQueuedCommand {
+
+ private final String data;
+
+ private final boolean endStream;
+
+ private TextDataQueueCommand(String text, boolean endStream) {
+ this.data = text;
+ this.endStream = endStream;
+ }
+
+ public static TextDataQueueCommand createCommand(String data, boolean endStream) {
+ return new TextDataQueueCommand(data, endStream);
+ }
+
+ @Override
+ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+ ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), data);
+ ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
+ }
+}