You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/08/30 10:51:22 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] Remove operation handler (#8637)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new dbfad05  [3.0-Triple] Remove operation handler (#8637)
dbfad05 is described below

commit dbfad0504bc6653aaf6827133b337b494dfd6167
Author: GuoHao <gu...@gmail.com>
AuthorDate: Mon Aug 30 18:50:44 2021 +0800

    [3.0-Triple] Remove operation handler (#8637)
    
    * Add more comments
    
    * Remove unused operation handler
    
    * Fix UT
---
 .../rpc/protocol/tri/AbstractClientStream.java     |  6 +--
 .../dubbo/rpc/protocol/tri/AbstractStream.java     | 54 +++++++++++-----------
 .../dubbo/rpc/protocol/tri/ClientStream.java       |  8 ++--
 .../rpc/protocol/tri/ClientTransportObserver.java  |  6 +--
 .../dubbo/rpc/protocol/tri/ServerStream.java       | 14 +++---
 .../rpc/protocol/tri/ServerTransportObserver.java  |  8 +---
 .../org/apache/dubbo/rpc/protocol/tri/Stream.java  | 46 ++++++++++++------
 .../dubbo/rpc/protocol/tri/TransportObserver.java  | 20 ++------
 .../protocol/tri/TripleClientInboundHandler.java   |  2 +-
 .../tri/TripleHttp2ClientResponseHandler.java      |  6 +--
 .../tri/TripleHttp2FrameServerHandler.java         |  6 +--
 .../protocol/tri/TripleServerInboundHandler.java   |  2 +-
 .../dubbo/rpc/protocol/tri/UnaryClientStream.java  |  2 +-
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  | 14 +++---
 .../rpc/protocol/tri/UnaryClientStreamTest.java    |  2 +-
 15 files changed, 99 insertions(+), 97 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 9f676b9..7af76cf 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -169,9 +169,9 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         public void onNext(Object data) {
             RpcInvocation invocation = (RpcInvocation) data;
             final Metadata metadata = createRequestMeta(invocation);
-            getTransportSubscriber().tryOnMetadata(metadata, false);
+            getTransportSubscriber().onMetadata(metadata, false);
             final byte[] bytes = encodeRequest(invocation);
-            getTransportSubscriber().tryOnData(bytes, false);
+            getTransportSubscriber().onData(bytes, false);
         }
 
         @Override
@@ -181,7 +181,7 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
 
         @Override
         public void onCompleted() {
-            getTransportSubscriber().tryOnComplete();
+            getTransportSubscriber().onComplete();
         }
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 5934871..9b74798 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -56,8 +56,8 @@ public abstract class AbstractStream implements Stream {
         ThreadFactory tripleTF = new NamedInternalThreadFactory("tri-callback", true);
         for (int i = 0; i < 4; i++) {
             final ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 0, TimeUnit.DAYS,
-                new LinkedBlockingQueue<>(1024),
-                tripleTF, new ThreadPoolExecutor.AbortPolicy());
+                    new LinkedBlockingQueue<>(1024),
+                    tripleTF, new ThreadPoolExecutor.AbortPolicy());
             CALLBACK_EXECUTORS.add(tp);
         }
 
@@ -85,7 +85,7 @@ public abstract class AbstractStream implements Stream {
         this.executor = executor;
         final String value = url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY);
         this.multipleSerialization = ExtensionLoader.getExtensionLoader(MultipleSerialization.class)
-            .getExtension(value);
+                .getExtension(value);
         this.streamObserver = createStreamObserver();
         this.transportObserver = createTransportObserver();
     }
@@ -185,34 +185,35 @@ public abstract class AbstractStream implements Stream {
     public TransportObserver asTransportObserver() {
         return transportObserver;
     }
-    protected void transportError(GrpcStatus status, Map<String,Object> attachments) {
+
+    protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
         // set metadata
         Metadata metadata = getMetaData(status);
-        getTransportSubscriber().tryOnMetadata(metadata, false);
+        getTransportSubscriber().onMetadata(metadata, false);
         // set trailers
         Metadata trailers = getTrailers(status);
         if (attachments != null) {
             convertAttachment(trailers, attachments);
         }
-        getTransportSubscriber().tryOnMetadata(trailers, true);
+        getTransportSubscriber().onMetadata(trailers, true);
         if (LOGGER.isErrorEnabled()) {
             LOGGER.error("[Triple-Server-Error] " + status.toMessage());
         }
     }
 
     protected void transportError(GrpcStatus status) {
-        transportError(status,null);
+        transportError(status, null);
     }
 
     protected void transportError(Throwable throwable) {
         GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable, throwable.getMessage());
         Metadata metadata = getMetaData(status);
-        getTransportSubscriber().tryOnMetadata(metadata, false);
+        getTransportSubscriber().onMetadata(metadata, false);
         Metadata trailers = getTrailers(status);
-        getTransportSubscriber().tryOnMetadata(trailers, true);
+        getTransportSubscriber().onMetadata(trailers, true);
         if (LOGGER.isErrorEnabled()) {
             LOGGER.error("[Triple-Server-Error] service=" + getServiceDescriptor().getServiceName()
-                + " method=" + getMethodName(), throwable);
+                    + " method=" + getMethodName(), throwable);
         }
     }
 
@@ -237,28 +238,28 @@ public abstract class AbstractStream implements Stream {
 
         Metadata metadata = new DefaultMetadata();
         Status.Builder builder = Status.newBuilder()
-            .setCode(grpcStatus.code.code)
-            .setMessage(getGrpcMessage(grpcStatus));
+                .setCode(grpcStatus.code.code)
+                .setMessage(getGrpcMessage(grpcStatus));
         Throwable throwable = grpcStatus.cause;
         if (throwable == null) {
             return metadata;
         }
         DebugInfo debugInfo = DebugInfo.newBuilder()
-            .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable))
-            // can not use now
-            // .setDetail(throwable.getMessage())
-            .build();
+                .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable))
+                // can not use now
+                // .setDetail(throwable.getMessage())
+                .build();
         builder.addDetails(Any.pack(debugInfo));
         Status status = builder.build();
         metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
-            TripleUtil.encodeBase64ASCII(status.toByteArray()));
+                TripleUtil.encodeBase64ASCII(status.toByteArray()));
         // only wrapper mode support exception serialization
         if (getMethodDescriptor() != null && !getMethodDescriptor().isNeedWrap()) {
             return metadata;
         }
         try {
             TripleWrapper.TripleExceptionWrapper exceptionWrapper = TripleUtil.wrapException(getUrl(), throwable,
-                getSerializeType(), getMultipleSerialization());
+                    getSerializeType(), getMultipleSerialization());
             String exceptionStr = TripleUtil.encodeBase64ASCII(exceptionWrapper.toByteArray());
             if (!TripleUtil.overEachHeaderListSize(exceptionStr)) {
                 metadata.put(TripleHeaderEnum.EXCEPTION_TW_BIN.getHeader(), exceptionStr);
@@ -341,7 +342,7 @@ public abstract class AbstractStream implements Stream {
         }
 
         @Override
-        public void onMetadata(Metadata metadata, boolean endStream, OperationHandler handler) {
+        public void onMetadata(Metadata metadata, boolean endStream) {
             if (headers == null) {
                 headers = metadata;
             } else {
@@ -378,24 +379,25 @@ public abstract class AbstractStream implements Stream {
         protected abstract void onError(GrpcStatus status);
 
         @Override
-        public void onComplete(OperationHandler handler) {
+        public void onComplete() {
             final GrpcStatus status = extractStatusFromMeta(getHeaders());
-            if (GrpcStatus.Code.isOk(status.code.code)) {
-                doOnComplete(handler);
+            if (Code.isOk(status.code.code)) {
+                doOnComplete();
             } else {
                 onError(status);
             }
         }
 
-        protected abstract void doOnComplete(OperationHandler handler);
+        protected abstract void doOnComplete() ;
+
 
         @Override
-        public void onData(byte[] in, boolean endStream, OperationHandler handler) {
+        public void onData(byte[] in, boolean endStream) {
             if (data == null) {
                 this.data = in;
             } else {
-                handler.operationDone(OperationResult.FAILURE, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                        .withDescription(DUPLICATED_DATA).asException());
+                onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                        .withDescription(DUPLICATED_DATA));
             }
         }
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index a69bd6b..4e31f6e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -37,10 +37,10 @@ public class ClientStream extends AbstractClientStream implements Stream {
                 if (!metaSent) {
                     metaSent = true;
                     final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
-                    getTransportSubscriber().tryOnMetadata(metadata, false);
+                    getTransportSubscriber().onMetadata(metadata, false);
                 }
                 final byte[] bytes = encodeRequest(data);
-                getTransportSubscriber().tryOnData(bytes, false);
+                getTransportSubscriber().onData(bytes, false);
             }
 
             @Override
@@ -55,7 +55,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
         return new AbstractTransportObserver() {
 
             @Override
-            public void onData(byte[] data, boolean endStream, OperationHandler handler) {
+            public void onData(byte[] data, boolean endStream) {
                 execute(() -> {
                     final Object resp = deserializeResponse(data);
                     getStreamSubscriber().onNext(resp);
@@ -63,7 +63,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
             }
 
             @Override
-            public void onComplete(OperationHandler handler) {
+            public void onComplete() {
                 execute(() -> {
                     final GrpcStatus status = extractStatusFromMeta(getHeaders());
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 871e874..cd3f1e6 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -59,7 +59,7 @@ public class ClientTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler) {
+    public void onMetadata(Metadata metadata, boolean endStream) {
         if (!headerSent) {
             final Http2Headers headers = new DefaultHttp2Headers(true)
                     .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
@@ -78,7 +78,7 @@ public class ClientTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onData(byte[] data, boolean endStream, Stream.OperationHandler handler) {
+    public void onData(byte[] data, boolean endStream) {
         ByteBuf buf = ctx.alloc().buffer();
         buf.writeByte(0);
         buf.writeInt(data.length);
@@ -92,7 +92,7 @@ public class ClientTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onComplete(Stream.OperationHandler handler) {
+    public void onComplete() {
         if (!endStreamSent) {
             endStreamSent = true;
             streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 75e01d9..27e568c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -44,11 +44,11 @@ public class ServerStream extends AbstractServerStream implements Stream {
         @Override
         public void onNext(Object data) {
             if (!headersSent) {
-                getTransportSubscriber().tryOnMetadata(new DefaultMetadata(), false);
+                getTransportSubscriber().onMetadata(new DefaultMetadata(), false);
                 headersSent = true;
             }
             final byte[] bytes = encodeResponse(data);
-            getTransportSubscriber().tryOnData(bytes, false);
+            getTransportSubscriber().onData(bytes, false);
         }
 
         @Override
@@ -64,15 +64,15 @@ public class ServerStream extends AbstractServerStream implements Stream {
             Metadata metadata = new DefaultMetadata();
             metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), "OK");
             metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
-            getTransportSubscriber().tryOnMetadata(metadata, true);
+            getTransportSubscriber().onMetadata(metadata, true);
         }
     }
 
     private class StreamTransportObserver extends AbstractTransportObserver implements TransportObserver {
 
         @Override
-        public void onMetadata(Metadata metadata, boolean endStream, OperationHandler handler) {
-            super.onMetadata(metadata, endStream, handler);
+        public void onMetadata(Metadata metadata, boolean endStream) {
+            super.onMetadata(metadata, endStream);
             if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
                 return;
             }
@@ -88,7 +88,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
         }
 
         @Override
-        public void onData(byte[] in, boolean endStream, OperationHandler handler) {
+        public void onData(byte[] in, boolean endStream) {
             try {
                 if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
                     RpcInvocation inv = buildInvocation(getHeaders());
@@ -112,7 +112,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
         }
 
         @Override
-        public void onComplete(OperationHandler handler) {
+        public void onComplete() {
             if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
                 return;
             }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index 0b7f9ff..11b0c2d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -34,7 +34,7 @@ public class ServerTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler) {
+    public void onMetadata(Metadata metadata, boolean endStream) {
         final DefaultHttp2Headers headers = new DefaultHttp2Headers(true);
         metadata.forEach(e -> {
             headers.set(e.getKey(), e.getValue());
@@ -50,15 +50,11 @@ public class ServerTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onData(byte[] data, boolean endStream, Stream.OperationHandler handler) {
+    public void onData(byte[] data, boolean endStream) {
         ByteBuf buf = ctx.alloc().buffer();
         buf.writeByte(0);
         buf.writeInt(data.length);
         buf.writeBytes(data);
         ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false));
     }
-
-    @Override
-    public void onComplete(Stream.OperationHandler handler) {
-    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
index 4737fa6..de83eab 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
@@ -20,32 +20,50 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
 
+/**
+ * Stream acts as a bi-directional intermediate layer for streaming data processing. It serializes object instance to
+ * byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #asTransportObserver()}
+ * and {@link #subscribe(TransportObserver)} provide {@link TransportObserver} to send or receive remote data.
+ * {@link #asStreamObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
+ * as API for users fetching/emitting objects from/to remote peer.
+ */
 public interface Stream {
 
     Logger LOGGER = LoggerFactory.getLogger(Stream.class);
 
+    /**
+     * Register an upstream data observer to receive byte[] sent by this stream
+     *
+     * @param observer receives remote byte[] data
+     */
     void subscribe(TransportObserver observer);
 
+    /**
+     * Get a downstream data observer for writing byte[] data to this stream
+     *
+     * @return an observer for writing byte[] to remote peer
+     */
     TransportObserver asTransportObserver();
 
+    /**
+     * Register an upstream data observer to receive byte[] sent by this stream
+     *
+     * @param observer receives remote byte[] data
+     */
     void subscribe(StreamObserver<Object> observer);
 
+    /**
+     * Get a downstream data observer for transmitting instances to application code
+     *
+     * @return an observer for writing byte[] to remote peer
+     */
     StreamObserver<Object> asStreamObserver();
 
+    /**
+     * Execute a task in stream's executor
+     *
+     * @param runnable task to run
+     */
     void execute(Runnable runnable);
 
-    enum OperationResult {
-        OK,
-        FAILURE,
-        NETWORK_FAIL
-    }
-
-    interface OperationHandler {
-
-        /**
-         * @param result operation's result
-         * @param cause  null if the operation succeed
-         */
-        void operationDone(OperationResult result, Throwable cause);
-    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 6e33c5c..64d833b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -18,25 +18,11 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 public interface TransportObserver {
-    Stream.OperationHandler EMPTY_HANDLER = (result, cause) -> {
-    };
 
-    default void tryOnMetadata(Metadata metadata, boolean endStream) {
-        onMetadata(metadata, endStream, EMPTY_HANDLER);
-    }
+    void onMetadata(Metadata metadata, boolean endStream);
 
-    default void tryOnData(byte[] data, boolean endStream) {
-        onData(data, endStream, EMPTY_HANDLER);
-    }
+    void onData(byte[] data, boolean endStream);
 
-    default void tryOnComplete() {
-        onComplete(EMPTY_HANDLER);
-    }
-
-    void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler);
-
-    void onData(byte[] data, boolean endStream, Stream.OperationHandler handler);
-
-    void onComplete(Stream.OperationHandler handler);
+    default void onComplete(){}
 
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index 913e91c..73d25f9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -27,7 +27,7 @@ public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
         final byte[] data = (byte[]) msg;
         if (clientStream != null) {
             clientStream.asTransportObserver()
-                    .tryOnData(data, false);
+                    .onData(data, false);
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index 096d412..2d2a4bd 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -60,9 +60,9 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
         Http2Headers headers = msg.headers();
         AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
         final TransportObserver observer = clientStream.asTransportObserver();
-        observer.tryOnMetadata(new Http2HeaderMeta(headers), false);
+        observer.onMetadata(new Http2HeaderMeta(headers), false);
         if (msg.isEndStream()) {
-            observer.tryOnComplete();
+            observer.onComplete();
         }
     }
 
@@ -85,7 +85,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
             final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
             // stream already closed;
             if (clientStream != null) {
-                clientStream.asTransportObserver().tryOnComplete();
+                clientStream.asTransportObserver().onComplete();
             }
         }
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index cbe37b6..0d71cb9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -86,7 +86,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         if (msg.isEndStream()) {
             final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
             if (serverStream != null) {
-                serverStream.asTransportObserver().tryOnComplete();
+                serverStream.asTransportObserver().onComplete();
             }
         }
     }
@@ -201,9 +201,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             stream.methods(methodDescriptors);
         }
         final TransportObserver observer = stream.asTransportObserver();
-        observer.tryOnMetadata(new Http2HeaderMeta(headers), false);
+        observer.onMetadata(new Http2HeaderMeta(headers), false);
         if (msg.isEndStream()) {
-            observer.tryOnComplete();
+            observer.onComplete();
         }
 
         ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index ba66c86..8c37699 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -26,7 +26,7 @@ public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
         final byte[] data = (byte[]) msg;
         if (serverStream != null) {
             serverStream.asTransportObserver()
-                    .tryOnData(data, false);
+                    .onData(data, false);
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index d0a118f..d8916e1 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -51,7 +51,7 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
     private class UnaryClientTransportObserver extends UnaryTransportObserver implements TransportObserver {
 
         @Override
-        public void doOnComplete(OperationHandler handler) {
+        public void doOnComplete() {
             execute(() -> {
                 try {
                     AppResponse result;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index c9de0cd..ca03aa4 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -55,13 +55,13 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
         }
 
         @Override
-        public void doOnComplete(OperationHandler handler) {
-            if (getData() == null) {
+        public void doOnComplete() {
+            if (getData() != null) {
+                execute(this::invoke);
+            } else {
                 onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
                         .withDescription("Missing request data"));
-                return;
             }
-            execute(this::invoke);
         }
 
         public void invoke() {
@@ -108,7 +108,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
                     }
                     Metadata metadata = new DefaultMetadata();
                     metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
-                    getTransportSubscriber().tryOnMetadata(metadata, false);
+                    getTransportSubscriber().onMetadata(metadata, false);
 
                     final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
                     final byte[] data;
@@ -119,7 +119,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
                     } finally {
                         ClassLoadUtil.switchContextLoader(tccl);
                     }
-                    getTransportSubscriber().tryOnData(data, false);
+                    getTransportSubscriber().onData(data, false);
 
                     Metadata trailers = new DefaultMetadata()
                             .put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
@@ -127,7 +127,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
                     if (attachments != null) {
                         convertAttachment(trailers, attachments);
                     }
-                    getTransportSubscriber().tryOnMetadata(trailers, true);
+                    getTransportSubscriber().onMetadata(trailers, true);
                 } catch (Throwable e) {
                     LOGGER.warn("Exception processing triple message", e);
                     if (e instanceof TripleRpcException) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 3af783f..611bb98 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -55,7 +55,7 @@ class UnaryClientStreamTest {
         stream.subscribe(transportObserver);
         // no method descriptor
         Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
-        Mockito.verify(transportObserver).tryOnMetadata(any(), anyBoolean());
+        Mockito.verify(transportObserver).onMetadata(any(), anyBoolean());
 
         MethodDescriptor md = Mockito.mock(MethodDescriptor.class);
         when(md.isNeedWrap()).thenReturn(true);