You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/25 06:38:58 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] Add TransportObserver doc (#9084)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 4aaad1e  [3.0-Triple] Add TransportObserver doc (#9084)
4aaad1e is described below

commit 4aaad1e6e986ac946097f5b9abc72f04eb08ca5b
Author: GuoHao <gu...@gmail.com>
AuthorDate: Mon Oct 25 14:38:48 2021 +0800

    [3.0-Triple] Add TransportObserver doc (#9084)
    
    * Add TransportObserver doc
    
    * Reactor TransportObserver
    
    * Reactor MessageObserver
    
    * Minor bugfix
    
    * remove unused http2error
    
    * opt server stream condition
    
    * fix ServerOutboundTransportObserver
    
    * Ignore unused msg
    
    * Remove TripleUtil
    
    * Add some doc
    
    Co-authored-by: earthchen <yo...@duobei.com>
---
 .../apache/dubbo/common/stream/StreamObserver.java |  12 +-
 .../rpc/protocol/tri/AbstractClientStream.java     | 250 ++++++++++++-------
 .../rpc/protocol/tri/AbstractServerStream.java     |  61 ++++-
 .../dubbo/rpc/protocol/tri/AbstractStream.java     | 277 +++++++++++----------
 ...r.java => ClientOutboundTransportObserver.java} |  16 +-
 .../dubbo/rpc/protocol/tri/ClientStream.java       |  23 +-
 .../rpc/protocol/tri/InboundTransportObserver.java |  60 +++++
 ...bserver.java => OutboundTransportObserver.java} |  41 ++-
 ...r.java => ServerOutboundTransportObserver.java} |  20 +-
 .../dubbo/rpc/protocol/tri/ServerStream.java       |  33 +--
 .../org/apache/dubbo/rpc/protocol/tri/Stream.java  |  20 +-
 .../dubbo/rpc/protocol/tri/TransportObserver.java  |  30 ++-
 .../dubbo/rpc/protocol/tri/TransportState.java     |  16 +-
 .../protocol/tri/TripleClientInboundHandler.java   |   2 +-
 .../protocol/tri/TripleClientRequestHandler.java   |   1 -
 .../tri/TripleHttp2ClientResponseHandler.java      |  16 +-
 .../tri/TripleHttp2FrameServerHandler.java         |  21 +-
 .../dubbo/rpc/protocol/tri/TripleProtocol.java     |   3 +-
 .../tri/TripleServerConnectionHandler.java         |  24 +-
 .../protocol/tri/TripleServerInboundHandler.java   |   2 +-
 .../apache/dubbo/rpc/protocol/tri/TripleUtil.java  | 252 -------------------
 .../dubbo/rpc/protocol/tri/UnaryClientStream.java  |  81 ++++--
 ...ler.java => UnaryInboundTransportObserver.java} |  25 +-
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  |  14 +-
 .../dubbo/rpc/protocol/tri/TransportStateTest.java |   6 +
 .../rpc/protocol/tri/UnaryClientStreamTest.java    |   4 +-
 26 files changed, 672 insertions(+), 638 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java
index debafaa..7ef12ea 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/StreamObserver.java
@@ -17,16 +17,24 @@
 
 package org.apache.dubbo.common.stream;
 
+/**
+ * StreamObserver is a common streaming API. It is an observer for receiving messages.
+ * Implementations are NOT required to be thread-safe.
+ *
+ * @param <T> type of message
+ */
 public interface StreamObserver<T> {
     /**
      * onNext
-     * @param data
+     *
+     * @param data to process
      */
     void onNext(T data);
 
     /**
      * onError
-     * @param throwable
+     *
+     * @param throwable error
      */
     void onError(Throwable throwable);
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index f7dab7e..97dbe04 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -20,6 +20,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.serialize.MultipleSerialization;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.Constants;
@@ -33,15 +34,18 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.ServiceModel;
 import org.apache.dubbo.triple.TripleWrapper;
 
+import com.google.protobuf.ByteString;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2StreamChannel;
 import io.netty.util.AsciiString;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +55,9 @@ import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
 import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 
 
+/**
+ * Abstracting common actions for client streaming.
+ */
 public abstract class AbstractClientStream extends AbstractStream implements Stream {
 
     private final AsciiString scheme;
@@ -62,7 +69,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
     protected AbstractClientStream(URL url) {
         super(url);
         this.scheme = getSchemeFromUrl(url);
-        // for client cancel,send rst frame to server
         this.getCancellationContext().addListener(context -> {
             Throwable throwable = this.getCancellationContext().getCancellationCause();
             if (LOGGER.isWarnEnabled()) {
@@ -70,7 +76,9 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
                     + getConsumerModel().getServiceName() + "#" + getMethodName() +
                     " was canceled by local exception ", throwable);
             }
-            this.asTransportObserver().onReset(getHttp2Error(throwable));
+            // for client cancel,send rst frame to server
+            this.outboundTransportObserver()
+                .onError(GrpcStatus.fromCode(GrpcStatus.Code.CANCELLED).withCause(throwable));
         });
     }
 
@@ -83,6 +91,13 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         return new ClientStream(url);
     }
 
+    /**
+     * TODO move this method to somewhere else
+     *
+     * @param req        the request
+     * @param connection connection
+     * @return a client stream
+     */
     public static AbstractClientStream newClientStream(Request req, Connection connection) {
         final RpcInvocation inv = (RpcInvocation) req.getData();
         final URL url = inv.getInvoker().getUrl();
@@ -100,6 +115,32 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         return stream;
     }
 
+    private static Compressor getCompressor(URL url, ServiceModel model) {
+        String compressorStr = url.getParameter(COMPRESSOR_KEY);
+        if (compressorStr == null) {
+            // Compressor can not be set by dynamic config
+            compressorStr = ConfigurationUtils
+                .getCachedDynamicProperty(model.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+        }
+        return Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
+    }
+
+    /**
+     * Get the tri protocol special MethodDescriptor
+     */
+    private static MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
+        List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
+        if (CollectionUtils.isEmpty(methodDescriptors)) {
+            throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+        }
+        for (MethodDescriptor methodDescriptor : methodDescriptors) {
+            if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
+                return methodDescriptor;
+            }
+        }
+        throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+    }
+
     protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
         execute(() -> {
             channel.pipeline()
@@ -107,9 +148,10 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
                 .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
                 .addLast(new TripleClientInboundHandler());
             channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(this);
-            final ClientTransportObserver clientTransportObserver = new ClientTransportObserver(channel, promise);
+            final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(channel, promise);
             subscribe(clientTransportObserver);
             try {
+                DefaultFuture2.addTimeoutListener(getRequestId(), channel::close);
                 doOnStartCall();
             } catch (Throwable throwable) {
                 cancel(throwable);
@@ -125,63 +167,8 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         return new ClientStreamObserverImpl(getCancellationContext());
     }
 
-    protected class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
-
-        public ClientStreamObserverImpl(CancellationContext cancellationContext) {
-            super(cancellationContext);
-        }
-
-        @Override
-        public void onNext(Object data) {
-            if (getState().allowSendMeta()) {
-                final Metadata metadata = createRequestMeta(getRpcInvocation());
-                getTransportSubscriber().onMetadata(metadata, false);
-            }
-            if (getState().allowSendData()) {
-                final byte[] bytes = encodeRequest(data);
-                getTransportSubscriber().onData(bytes, false);
-            }
-        }
-
-        /**
-         * Handle all exceptions in the request process, other procedures directly throw
-         * <p>
-         * other procedures is {@link ClientStreamObserver#onNext(Object)} and {@link ClientStreamObserver#onCompleted()}
-         */
-        @Override
-        public void onError(Throwable throwable) {
-            if (getState().allowSendEndStream()) {
-                GrpcStatus status = GrpcStatus.getStatus(throwable);
-                transportError(status, null, getState().allowSendMeta());
-            } else {
-                if (LOGGER.isErrorEnabled()) {
-                    LOGGER.error("Triple request to "
-                        + getConsumerModel().getServiceName() + "#" + getMethodName() +
-                        " was failed by exception ", throwable);
-                }
-            }
-        }
-
-        @Override
-        public void onCompleted() {
-            if (getState().allowSendEndStream()) {
-                getTransportSubscriber().onComplete();
-            }
-        }
-
-        @Override
-        public void setCompression(String compression) {
-            if (!getState().allowSendMeta()) {
-                cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
-                return;
-            }
-            Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
-            setCompressor(compressor);
-        }
-    }
-
     @Override
-    protected void cancelByRemoteReset(Http2Error http2Error) {
+    protected void cancelByRemoteReset() {
         DefaultFuture2.getFuture(getRequestId()).cancel();
     }
 
@@ -190,18 +177,17 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         getCancellationContext().cancel(throwable);
     }
 
-
     @Override
     public void execute(Runnable runnable) {
         try {
             super.execute(runnable);
         } catch (RejectedExecutionException e) {
             LOGGER.error("Consumer's thread pool is full", e);
-            getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
+            outboundMessageSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
                 .withDescription("Consumer's thread pool is full").asException());
         } catch (Throwable t) {
             LOGGER.error("Consumer submit request to thread pool error ", t);
-            getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+            outboundMessageSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
                 .withCause(t)
                 .withDescription("Consumer's error")
                 .asException());
@@ -243,11 +229,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         }
     }
 
-    private Http2Error getHttp2Error(Throwable throwable) {
-        // todo Convert the exception to http2Error
-        return Http2Error.CANCEL;
-    }
-
     public ConsumerModel getConsumerModel() {
         return consumerModel;
     }
@@ -270,17 +251,53 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         } else {
             obj = getRequestValue(value);
         }
-        out = TripleUtil.pack(obj);
+        out = pack(obj);
         return super.compress(out);
     }
 
     private TripleWrapper.TripleRequestWrapper getRequestWrapper(Object value) {
         if (getMethodDescriptor().isStream()) {
             String type = getMethodDescriptor().getParameterClasses()[0].getName();
-            return TripleUtil.wrapReq(getUrl(), getSerializeType(), value, type, getMultipleSerialization());
+            return wrapReq(getUrl(), getSerializeType(), value, type, getMultipleSerialization());
         } else {
             RpcInvocation invocation = (RpcInvocation) value;
-            return TripleUtil.wrapReq(getUrl(), invocation, getMultipleSerialization());
+            return wrapReq(getUrl(), invocation, getMultipleSerialization());
+        }
+    }
+
+    private TripleWrapper.TripleRequestWrapper wrapReq(URL url, RpcInvocation invocation,
+                                                       MultipleSerialization serialization) {
+        try {
+            String serializationName = (String) invocation.getObjectAttachment(Constants.SERIALIZATION_KEY);
+            final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
+                .setSerializeType(convertHessianToWrapper(serializationName));
+            for (int i = 0; i < invocation.getArguments().length; i++) {
+                final String clz = invocation.getParameterTypes()[i].getName();
+                builder.addArgTypes(clz);
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                serialization.serialize(url, serializationName, clz, invocation.getArguments()[i], bos);
+                builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
+            }
+            return builder.build();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to pack wrapper req", e);
+        }
+    }
+
+    public TripleWrapper.TripleRequestWrapper wrapReq(URL url, String serializeType, Object req,
+                                                      String type,
+                                                      MultipleSerialization multipleSerialization) {
+        try {
+            final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
+                .addArgTypes(type)
+                .setSerializeType(convertHessianToWrapper(serializeType));
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            multipleSerialization.serialize(url, serializeType, type, req, bos);
+            builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
+            bos.close();
+            return builder.build();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to pack wrapper req", e);
         }
     }
 
@@ -299,22 +316,36 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
                 ClassLoadUtil.switchContextLoader(getConsumerModel().getClassLoader());
             }
             if (getMethodDescriptor().isNeedWrap()) {
-                final TripleWrapper.TripleResponseWrapper wrapper = TripleUtil.unpack(data,
+                final TripleWrapper.TripleResponseWrapper wrapper = unpack(data,
                     TripleWrapper.TripleResponseWrapper.class);
-                if (!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()))) {
+                if (!getSerializeType().equals(convertHessianFromWrapper(wrapper.getSerializeType()))) {
                     throw new UnsupportedOperationException("Received inconsistent serialization type from server, " +
                         "reject to deserialize! Expected:" + getSerializeType() +
-                        " Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()));
+                        " Actual:" + convertHessianFromWrapper(wrapper.getSerializeType()));
                 }
-                return TripleUtil.unwrapResp(getUrl(), wrapper, getMultipleSerialization());
+                return unwrapResp(getUrl(), wrapper, getMultipleSerialization());
             } else {
-                return TripleUtil.unpack(data, getMethodDescriptor().getReturnClass());
+                return unpack(data, getMethodDescriptor().getReturnClass());
             }
         } finally {
             ClassLoadUtil.switchContextLoader(tccl);
         }
     }
 
+    public Object unwrapResp(URL url, TripleWrapper.TripleResponseWrapper wrap,
+                             MultipleSerialization serialization) {
+        String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
+        try {
+            final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
+            final Object ret = serialization.deserialize(url, serializeType, wrap.getType(), bais);
+            bais.close();
+            return ret;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to unwrap resp", e);
+        }
+    }
+
+
     protected Metadata createRequestMeta(RpcInvocation inv) {
         Metadata metadata = new DefaultMetadata();
         // put http2 params
@@ -347,29 +378,58 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         return "/" + inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName();
     }
 
-    private static Compressor getCompressor(URL url, ServiceModel model) {
-        String compressorStr = url.getParameter(COMPRESSOR_KEY);
-        if (compressorStr == null) {
-            // Compressor can not be set by dynamic config
-            compressorStr = ConfigurationUtils
-                .getCachedDynamicProperty(model.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+    protected class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+
+        public ClientStreamObserverImpl(CancellationContext cancellationContext) {
+            super(cancellationContext);
         }
-        return Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
-    }
 
-    /**
-     * Get the tri protocol special MethodDescriptor
-     */
-    private static MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
-        List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
-        if (CollectionUtils.isEmpty(methodDescriptors)) {
-            throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+        @Override
+        public void onNext(Object data) {
+            if (getState().allowSendMeta()) {
+                final Metadata metadata = createRequestMeta(getRpcInvocation());
+                outboundTransportObserver().onMetadata(metadata, false);
+            }
+            if (getState().allowSendData()) {
+                final byte[] bytes = encodeRequest(data);
+                outboundTransportObserver().onData(bytes, false);
+            }
         }
-        for (MethodDescriptor methodDescriptor : methodDescriptors) {
-            if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
-                return methodDescriptor;
+
+        /**
+         * Handle all exceptions in the request process, other procedures directly throw
+         * <p>
+         * other procedures is {@link ClientStreamObserver#onNext(Object)} and {@link ClientStreamObserver#onCompleted()}
+         */
+        @Override
+        public void onError(Throwable throwable) {
+            if (getState().allowSendEndStream()) {
+                GrpcStatus status = GrpcStatus.getStatus(throwable);
+                transportError(status, null, getState().allowSendMeta());
+            } else {
+                if (LOGGER.isErrorEnabled()) {
+                    LOGGER.error("Triple request to "
+                        + getConsumerModel().getServiceName() + "#" + getMethodName() +
+                        " was failed by exception ", throwable);
+                }
             }
         }
-        throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+
+        @Override
+        public void onCompleted() {
+            if (getState().allowSendEndStream()) {
+                outboundTransportObserver().onComplete();
+            }
+        }
+
+        @Override
+        public void setCompression(String compression) {
+            if (!getState().allowSendMeta()) {
+                cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+                return;
+            }
+            Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+            setCompressor(compressor);
+        }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index f566641..17dd26d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.serialize.MultipleSerialization;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.HeaderFilter;
 import org.apache.dubbo.rpc.Invoker;
@@ -30,12 +31,15 @@ import org.apache.dubbo.rpc.model.ScopeModelUtil;
 import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.triple.TripleWrapper;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -149,13 +153,13 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
                 ClassLoadUtil.switchContextLoader(getProviderModel().getServiceInterfaceClass().getClassLoader());
             }
             if (getMethodDescriptor() == null || getMethodDescriptor().isNeedWrap()) {
-                final TripleWrapper.TripleRequestWrapper wrapper = TripleUtil.unpack(data,
+                final TripleWrapper.TripleRequestWrapper wrapper = unpack(data,
                     TripleWrapper.TripleRequestWrapper.class);
-                if (!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()))) {
+                if (!getSerializeType().equals(convertHessianFromWrapper(wrapper.getSerializeType()))) {
                     transportError(GrpcStatus.fromCode(GrpcStatus.Code.INVALID_ARGUMENT)
                         .withDescription("Received inconsistent serialization type from client, " +
                             "reject to deserialize! Expected:" + getSerializeType() +
-                            " Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
+                            " Actual:" + convertHessianFromWrapper(wrapper.getSerializeType())));
                     return null;
                 }
                 if (getMethodDescriptor() == null) {
@@ -175,9 +179,9 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
                         return null;
                     }
                 }
-                return TripleUtil.unwrapReq(getUrl(), wrapper, getMultipleSerialization());
+                return unwrapReq(getUrl(), wrapper, getMultipleSerialization());
             } else {
-                return new Object[]{TripleUtil.unpack(data, getMethodDescriptor().getParameterClasses()[0])};
+                return new Object[]{unpack(data, getMethodDescriptor().getParameterClasses()[0])};
             }
         } catch (Throwable throwable) {
             LOGGER.warn("Decode request failed:", throwable);
@@ -189,6 +193,23 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
         }
     }
 
+    private Object[] unwrapReq(URL url, TripleWrapper.TripleRequestWrapper wrap,
+                               MultipleSerialization multipleSerialization) {
+        String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
+        try {
+            Object[] arguments = new Object[wrap.getArgsCount()];
+            for (int i = 0; i < arguments.length; i++) {
+                final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getArgs(i).toByteArray());
+                Object obj = multipleSerialization.deserialize(url,
+                    serializeType, wrap.getArgTypes(i), bais);
+                arguments[i] = obj;
+            }
+            return arguments;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to unwrap req: " + e.getMessage(), e);
+        }
+    }
+
     /**
      * create basic meta data
      */
@@ -209,12 +230,12 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
             }
             final Message message;
             if (getMethodDescriptor().isNeedWrap()) {
-                message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
+                message = wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
                     getMultipleSerialization());
             } else {
                 message = (Message) value;
             }
-            byte[] out = TripleUtil.pack(message);
+            byte[] out = pack(message);
             return super.compress(out);
         } catch (Throwable throwable) {
             LOGGER.error("Encode Response data error ", throwable);
@@ -263,13 +284,33 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
     }
 
     @Override
-    protected void cancelByRemoteReset(Http2Error http2Error) {
+    protected void cancelByRemoteReset() {
         getCancellationContext().cancel(null);
     }
 
 
     @Override
     protected void cancelByLocal(Throwable throwable) {
-        asTransportObserver().onReset(Http2Error.CANCEL);
+        inboundTransportObserver()
+            .onError(GrpcStatus.fromCode(GrpcStatus.Code.CANCELLED)
+                .withCause(throwable));
     }
+
+    public TripleWrapper.TripleResponseWrapper wrapResp(URL url, String serializeType, Object resp,
+                                                        MethodDescriptor desc,
+                                                        MultipleSerialization multipleSerialization) {
+        try {
+            final TripleWrapper.TripleResponseWrapper.Builder builder = TripleWrapper.TripleResponseWrapper.newBuilder()
+                .setType(desc.getReturnClass().getName())
+                .setSerializeType(convertHessianToWrapper(serializeType));
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            multipleSerialization.serialize(url, serializeType, desc.getReturnClass().getName(), resp, bos);
+            builder.setData(ByteString.copyFrom(bos.toByteArray()));
+            bos.close();
+            return builder.build();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to pack wrapper req", e);
+        }
+    }
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index aaac6d8..1220ec7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -26,28 +26,37 @@ import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.config.Constants;
 import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
 
 import com.google.protobuf.Any;
 import com.google.rpc.DebugInfo;
 import com.google.rpc.Status;
-import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
+/**
+ * AbstractStream provides more detailed actions for streaming process.
+ */
 public abstract class AbstractStream implements Stream {
-    protected static final String DUPLICATED_DATA = "Duplicated data";
+
+    private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
+    private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder().withoutPadding();
 
     private final URL url;
     private final MultipleSerialization multipleSerialization;
-    private final StreamObserver<Object> streamObserver;
-    private final TransportObserver transportObserver;
+    private final StreamObserver<Object> inboundMessageObserver;
+    private final InboundTransportObserver inboundTransportObserver;
     private final Executor executor;
     private final CancellationContext cancellationContext;
     // AcceptEncoding does not change after the application is started,
@@ -57,8 +66,8 @@ public abstract class AbstractStream implements Stream {
     private MethodDescriptor methodDescriptor;
     private String methodName;
     private String serializeType;
-    private StreamObserver<Object> streamSubscriber;
-    private AbstractChannelTransportObserver transportSubscriber;
+    private StreamObserver<Object> outboundMessageSubscriber;
+    private OutboundTransportObserver outboundTransportObserver;
     private Compressor compressor = IdentityCompressor.NONE;
     private Compressor deCompressor = IdentityCompressor.NONE;
     private volatile boolean cancelled = false;
@@ -70,16 +79,53 @@ public abstract class AbstractStream implements Stream {
     protected AbstractStream(URL url, Executor executor) {
         this.url = url;
         final Executor sourceExecutor = lookupExecutor(url, executor);
+        // wrap executor to ensure linear stream message processing
         this.executor = wrapperSerializingExecutor(sourceExecutor);
         final String value = url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY);
-        this.multipleSerialization = url.getOrDefaultFrameworkModel().getExtensionLoader(MultipleSerialization.class)
+        this.multipleSerialization = url.getOrDefaultFrameworkModel()
+            .getExtensionLoader(MultipleSerialization.class)
             .getExtension(value);
         this.cancellationContext = new CancellationContext();
-        this.transportObserver = createTransportObserver();
-        this.streamObserver = createStreamObserver();
+        // A stream implementation must know how to process inbound transport message
+        this.inboundTransportObserver = createInboundTransportObserver();
+        // A stream implementation must know how to process inbound App level message
+        this.inboundMessageObserver = createStreamObserver();
         this.acceptEncoding = Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel());
     }
 
+
+    /**
+     * Cancel by remote by receiving reset frame
+     */
+    protected abstract void cancelByRemoteReset();
+
+    /**
+     * Cancel by local by some error
+     *
+     * @param throwable the cancel cause
+     */
+    protected abstract void cancelByLocal(Throwable throwable);
+
+    /**
+     * create request StreamObserver
+     */
+    protected abstract StreamObserver<Object> createStreamObserver();
+
+    /**
+     * create response TransportObserver
+     */
+    protected abstract InboundTransportObserver createInboundTransportObserver();
+
+    private void closeQuietly(Closeable c) {
+        if (c != null) {
+            try {
+                c.close();
+            } catch (IOException ignore) {
+                // ignored
+            }
+        }
+    }
+
     private Executor lookupExecutor(URL url, Executor executor) {
         // only server maybe not null
         if (executor != null) {
@@ -104,7 +150,7 @@ public abstract class AbstractStream implements Stream {
     }
 
     public TransportState getState() {
-        return transportSubscriber.state;
+        return outboundTransportObserver.state;
     }
 
     public boolean isCancelled() {
@@ -147,32 +193,16 @@ public abstract class AbstractStream implements Stream {
 
     private void cancel() {
         cancelled = true;
-        execute(RpcContext::removeCancellationContext);
     }
 
     /**
      * remote cancel
-     *
-     * @param http2Error {@link Http2Error}
      */
-    protected final void cancelByRemote(Http2Error http2Error) {
+    protected final void cancelByRemote() {
         cancel();
-        cancelByRemoteReset(http2Error);
+        cancelByRemoteReset();
     }
 
-    protected abstract void cancelByRemoteReset(Http2Error http2Error);
-
-    protected abstract void cancelByLocal(Throwable throwable);
-
-    /**
-     * create request StreamObserver
-     */
-    protected abstract StreamObserver<Object> createStreamObserver();
-
-    /**
-     * create response TransportObserver
-     */
-    protected abstract TransportObserver createTransportObserver();
 
     public String getSerializeType() {
         return serializeType;
@@ -190,12 +220,12 @@ public abstract class AbstractStream implements Stream {
         return multipleSerialization;
     }
 
-    public StreamObserver<Object> getStreamSubscriber() {
-        return streamSubscriber;
+    public StreamObserver<Object> outboundMessageSubscriber() {
+        return outboundMessageSubscriber;
     }
 
-    public TransportObserver getTransportSubscriber() {
-        return transportSubscriber;
+    public OutboundTransportObserver outboundTransportObserver() {
+        return outboundTransportObserver;
     }
 
     public MethodDescriptor getMethodDescriptor() {
@@ -250,23 +280,22 @@ public abstract class AbstractStream implements Stream {
     }
 
     @Override
-    public void subscribe(StreamObserver<Object> observer) {
-        this.streamSubscriber = observer;
+    public void subscribe(StreamObserver<Object> outboundMessageObserver) {
+        this.outboundMessageSubscriber = outboundMessageObserver;
     }
 
     @Override
-    public void subscribe(AbstractChannelTransportObserver observer) {
-        this.transportSubscriber = observer;
+    public void subscribe(OutboundTransportObserver observer) {
+        this.outboundTransportObserver = observer;
     }
 
-    @Override
-    public StreamObserver<Object> asStreamObserver() {
-        return streamObserver;
+    public StreamObserver<Object> inboundMessageObserver() {
+        return inboundMessageObserver;
     }
 
     @Override
-    public TransportObserver asTransportObserver() {
-        return transportObserver;
+    public TransportObserver inboundTransportObserver() {
+        return inboundTransportObserver;
     }
 
     // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
@@ -274,31 +303,22 @@ public abstract class AbstractStream implements Stream {
         if (!onlyTrailers) {
             // set metadata
             Metadata metadata = new DefaultMetadata();
-            getTransportSubscriber().onMetadata(metadata, false);
+            outboundTransportObserver().onMetadata(metadata, false);
         }
         // set trailers
         Metadata trailers = getTrailers(status);
         if (attachments != null) {
             convertAttachment(trailers, attachments);
         }
-        getTransportSubscriber().onMetadata(trailers, true);
+        outboundTransportObserver().onMetadata(trailers, true);
         if (LOGGER.isErrorEnabled()) {
             LOGGER.error("[Triple-Error] status=" + status.code.code
                 + " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
         }
     }
 
-    protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
-        transportError(status, attachments, false);
-    }
-
     protected void transportError(GrpcStatus status) {
-        transportError(status, null);
-    }
-
-    protected void transportError(Throwable throwable) {
-        GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable, throwable.getMessage());
-        transportError(status, null);
+        transportError(status, null, false);
     }
 
     private String getGrpcMessage(GrpcStatus status) {
@@ -322,7 +342,7 @@ public abstract class AbstractStream implements Stream {
         if (throwable == null) {
             Status status = builder.build();
             metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
-                TripleUtil.encodeBase64ASCII(status.toByteArray()));
+                encodeBase64ASCII(status.toByteArray()));
             return metadata;
         }
         DebugInfo debugInfo = DebugInfo.newBuilder()
@@ -333,10 +353,16 @@ public abstract class AbstractStream implements Stream {
         builder.addDetails(Any.pack(debugInfo));
         Status status = builder.build();
         metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
-            TripleUtil.encodeBase64ASCII(status.toByteArray()));
+            encodeBase64ASCII(status.toByteArray()));
         return metadata;
     }
 
+    /**
+     * Parse metadata to a KV pairs map.
+     *
+     * @param metadata the metadata from remote
+     * @return KV pairs map
+     */
     protected Map<String, Object> parseMetadataToAttachmentMap(Metadata metadata) {
         Map<String, Object> attachments = new HashMap<>();
         for (Map.Entry<CharSequence, CharSequence> header : metadata) {
@@ -350,7 +376,7 @@ public abstract class AbstractStream implements Stream {
             }
             if (key.endsWith(TripleConstant.GRPC_BIN_SUFFIX) && key.length() > 4) {
                 try {
-                    attachments.put(key.substring(0, key.length() - 4), TripleUtil.decodeASCIIByte(header.getValue()));
+                    attachments.put(key.substring(0, key.length() - 4), decodeASCIIByte(header.getValue()));
                 } catch (Exception e) {
                     LOGGER.error("Failed to parse response attachment key=" + key, e);
                 }
@@ -361,6 +387,13 @@ public abstract class AbstractStream implements Stream {
         return attachments;
     }
 
+    /**
+     * Parse and put the KV pairs into metadata. Ignore Http2 PseudoHeaderName and internal name.
+     * Only raw byte array or string value will be put.
+     *
+     * @param metadata    the metadata holder
+     * @param attachments KV pairs
+     */
     protected void convertAttachment(Metadata metadata, Map<String, Object> attachments) {
         if (attachments == null) {
             return;
@@ -391,7 +424,7 @@ public abstract class AbstractStream implements Stream {
                 String str = (String) v;
                 metadata.put(key, str);
             } else if (v instanceof byte[]) {
-                String str = TripleUtil.encodeBase64ASCII((byte[]) v);
+                String str = encodeBase64ASCII((byte[]) v);
                 metadata.put(key + TripleConstant.GRPC_BIN_SUFFIX, str);
             }
         } catch (Throwable t) {
@@ -399,97 +432,73 @@ public abstract class AbstractStream implements Stream {
         }
     }
 
-    protected byte[] compress(byte[] data) {
-        return this.getCompressor().compress(data);
+    protected String convertHessianFromWrapper(String serializeType) {
+        if (TripleConstant.HESSIAN4.equals(serializeType)) {
+            return TripleConstant.HESSIAN2;
+        }
+        return serializeType;
     }
 
-    protected byte[] decompress(byte[] data) {
-        return this.getDeCompressor().decompress(data);
+    protected <T> T unpack(byte[] data, Class<T> clz) {
+        return unpack(new ByteArrayInputStream(data), clz);
     }
 
-    protected abstract class AbstractTransportObserver implements TransportObserver {
-        private Metadata headers;
-        private Metadata trailers;
-
-        public Metadata getHeaders() {
-            return headers;
-        }
-
-        public Metadata getTrailers() {
-            return trailers;
+    protected <T> T unpack(InputStream is, Class<T> clz) {
+        try {
+            final T req = SingleProtobufUtils.deserialize(is, clz);
+            is.close();
+            return req;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to unpack req", e);
+        } finally {
+            closeQuietly(is);
         }
+    }
 
-        @Override
-        public void onReset(Http2Error http2Error) {
-            if (getState().allowSendReset()) {
-                getState().setResetSend();
-                getTransportSubscriber().onReset(http2Error);
-            }
+    protected byte[] pack(Object obj) {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            SingleProtobufUtils.serialize(obj, baos);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to pack protobuf object", e);
         }
+        return baos.toByteArray();
+    }
 
-        @Override
-        public void onMetadata(Metadata metadata, boolean endStream) {
-            if (headers == null) {
-                headers = metadata;
-            } else {
-                trailers = metadata;
-            }
-        }
 
-        protected GrpcStatus extractStatusFromMeta(Metadata metadata) {
-            if (metadata.contains(TripleHeaderEnum.STATUS_KEY.getHeader())) {
-                final int code = Integer.parseInt(metadata.get(TripleHeaderEnum.STATUS_KEY.getHeader()).toString());
-
-                if (!GrpcStatus.Code.isOk(code)) {
-                    GrpcStatus status = GrpcStatus.fromCode(code);
-                    if (metadata.contains(TripleHeaderEnum.MESSAGE_KEY.getHeader())) {
-                        final String raw = metadata.get(TripleHeaderEnum.MESSAGE_KEY.getHeader()).toString();
-                        status = status.withDescription(GrpcStatus.fromMessage(raw));
-                    }
-                    return status;
-                }
-                return GrpcStatus.fromCode(Code.OK);
-            }
-            return GrpcStatus.fromCode(Code.OK);
-        }
+    protected String encodeBase64ASCII(byte[] in) {
+        byte[] bytes = encodeBase64(in);
+        return new String(bytes, StandardCharsets.US_ASCII);
+    }
 
+    protected byte[] encodeBase64(byte[] in) {
+        return BASE64_ENCODER.encode(in);
     }
 
-    protected abstract class UnaryTransportObserver extends AbstractTransportObserver implements TransportObserver {
-        private byte[] data;
+    protected byte[] decodeASCIIByte(CharSequence value) {
+        return BASE64_DECODER.decode(value.toString().getBytes(StandardCharsets.US_ASCII));
+    }
 
-        public byte[] getData() {
-            return data;
+    /**
+     * Convert hessian version from Dubbo's SPI version(hessian2) to wrapper API version (hessian4)
+     *
+     * @param serializeType literal type
+     * @return hessian4 if the param is hessian2, otherwise return the param
+     */
+    protected String convertHessianToWrapper(String serializeType) {
+        if (TripleConstant.HESSIAN2.equals(serializeType)) {
+            return TripleConstant.HESSIAN4;
         }
+        return serializeType;
+    }
 
-        protected abstract void onError(GrpcStatus status);
-
-        @Override
-        public void onComplete() {
-            execute(() -> {
-                final GrpcStatus status = extractStatusFromMeta(getHeaders());
-                if (Code.isOk(status.code.code)) {
-                    doOnComplete();
-                } else {
-                    onError(status);
-                }
-            });
-        }
+    protected byte[] compress(byte[] data) {
+        return this.getCompressor().compress(data);
+    }
 
-        /**
-         * This method exception needs to be caught by the implementation class
-         */
-        protected abstract void doOnComplete();
+    protected byte[] decompress(byte[] data) {
+        return this.getDeCompressor().decompress(data);
+    }
 
 
-        @Override
-        public void onData(byte[] in, boolean endStream) {
-            if (data == null) {
-                this.data = in;
-            } else {
-                onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withDescription(DUPLICATED_DATA));
-            }
-        }
-    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
similarity index 86%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
index 1a53881..c939ae5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientOutboundTransportObserver.java
@@ -17,9 +17,6 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
@@ -30,13 +27,16 @@ import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2StreamChannel;
 
-public class ClientTransportObserver extends AbstractChannelTransportObserver {
+/**
+ * Send stream data to remote
+ * {@link ClientOutboundTransportObserver#promise} will be set success after rst or complete sent,
+ */
+public class ClientOutboundTransportObserver extends OutboundTransportObserver {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ClientTransportObserver.class);
     private final ChannelPromise promise;
     private final Http2StreamChannel streamChannel;
 
-    public ClientTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
+    public ClientOutboundTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
         this.streamChannel = channel;
         this.promise = promise;
     }
@@ -68,8 +68,8 @@ public class ClientTransportObserver extends AbstractChannelTransportObserver {
     }
 
     @Override
-    protected void doOnReset(Http2Error http2Error) {
-        streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+    protected void doOnError(GrpcStatus status) {
+        streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
             .addListener(future -> {
                 if (future.isSuccess()) {
                     promise.trySuccess();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index b868d87..a41b739 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -31,8 +31,8 @@ public class ClientStream extends AbstractClientStream implements Stream {
     }
 
     @Override
-    protected TransportObserver createTransportObserver() {
-        return new ClientTransportObserverImpl();
+    protected InboundTransportObserver createInboundTransportObserver() {
+        return new ClientStreamInboundTransportObserverImpl();
     }
 
     @Override
@@ -47,8 +47,8 @@ public class ClientStream extends AbstractClientStream implements Stream {
         StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[1];
         obServer = attachCancelContext(obServer, getCancellationContext());
         subscribe(obServer);
-        asStreamObserver().onNext(getRpcInvocation().getArguments()[0]);
-        asStreamObserver().onCompleted();
+        inboundMessageObserver().onNext(getRpcInvocation().getArguments()[0]);
+        inboundMessageObserver().onCompleted();
         return new AppResponse();
     }
 
@@ -56,7 +56,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
         StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[0];
         obServer = attachCancelContext(obServer, getCancellationContext());
         subscribe(obServer);
-        return new AppResponse(asStreamObserver());
+        return new AppResponse(inboundMessageObserver());
     }
 
     private <T> StreamObserver<T> attachCancelContext(StreamObserver<T> observer, CancellationContext context) {
@@ -68,7 +68,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
         return observer;
     }
 
-    private class ClientTransportObserverImpl extends AbstractTransportObserver {
+    private class ClientStreamInboundTransportObserverImpl extends InboundTransportObserver {
 
         private boolean error = false;
 
@@ -77,7 +77,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
             execute(() -> {
                 try {
                     final Object resp = deserializeResponse(data);
-                    getStreamSubscriber().onNext(resp);
+                    outboundMessageSubscriber().onNext(resp);
                 } catch (Throwable throwable) {
                     onError(throwable);
                 }
@@ -85,12 +85,17 @@ public class ClientStream extends AbstractClientStream implements Stream {
         }
 
         @Override
+        public void onError(GrpcStatus status) {
+            onError(status.asException());
+        }
+
+        @Override
         public void onComplete() {
             execute(() -> {
                 getState().setServerEndStreamReceived();
                 final GrpcStatus status = extractStatusFromMeta(getHeaders());
                 if (GrpcStatus.Code.isOk(status.code.code)) {
-                    getStreamSubscriber().onCompleted();
+                    outboundMessageSubscriber().onCompleted();
                 } else {
                     onError(status.cause);
                 }
@@ -105,7 +110,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
             if (!getState().serverSendStreamReceived()) {
                 cancel(throwable);
             }
-            getStreamSubscriber().onError(throwable);
+            outboundMessageSubscriber().onError(throwable);
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/InboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/InboundTransportObserver.java
new file mode 100644
index 0000000..43050f1
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/InboundTransportObserver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
+
+public abstract class InboundTransportObserver implements TransportObserver {
+    private Metadata headers;
+    private Metadata trailers;
+
+    public Metadata getHeaders() {
+        return headers;
+    }
+
+    public Metadata getTrailers() {
+        return trailers;
+    }
+
+    @Override
+    public void onMetadata(Metadata metadata, boolean endStream) {
+        if (headers == null) {
+            headers = metadata;
+        } else {
+            trailers = metadata;
+        }
+    }
+
+    protected GrpcStatus extractStatusFromMeta(Metadata metadata) {
+        if (!metadata.contains(TripleHeaderEnum.STATUS_KEY.getHeader())) {
+            return GrpcStatus.fromCode(Code.OK);
+        }
+        final int code = Integer.parseInt(metadata.get(TripleHeaderEnum.STATUS_KEY.getHeader()).toString());
+
+        if (Code.isOk(code)) {
+            return GrpcStatus.fromCode(Code.OK);
+        }
+        GrpcStatus status = GrpcStatus.fromCode(code);
+        if (!metadata.contains(TripleHeaderEnum.MESSAGE_KEY.getHeader())) {
+            return status;
+        }
+        final String raw = metadata.get(TripleHeaderEnum.MESSAGE_KEY.getHeader()).toString();
+        status = status.withDescription(GrpcStatus.fromMessage(raw));
+        return status;
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
similarity index 61%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
index 47c24f3..4194b4f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/OutboundTransportObserver.java
@@ -17,38 +17,63 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.handler.codec.http2.Http2Error;
-
-public abstract class AbstractChannelTransportObserver implements TransportObserver {
+/**
+ * Provides loosely state management for write message to outbound.
+ */
+public abstract class OutboundTransportObserver implements TransportObserver {
 
     protected final TransportState state = new TransportState();
 
     @Override
     public void onMetadata(Metadata metadata, boolean endStream) {
+        checkSendMeta(metadata, endStream);
+        doOnMetadata(metadata, endStream);
+    }
+
+    protected void checkSendMeta(Object metadata, boolean endStream) {
         if (endStream) {
+            if (!state.allowSendEndStream()) {
+                throw new IllegalStateException("Metadata endStream already sent to peer, send " + metadata + " failed!");
+            }
             state.setEndStreamSend();
         } else {
+            if (!state.allowSendMeta()) {
+                throw new IllegalStateException("Metadata already sent to peer, send " + metadata + " failed!");
+            }
             state.setMetaSend();
         }
-        doOnMetadata(metadata, endStream);
     }
 
     @Override
     public void onData(byte[] data, boolean endStream) {
+        checkSendData(endStream);
+        doOnData(data, endStream);
+    }
+
+
+    protected void checkSendData(boolean endStream) {
+        if (!state.allowSendData()) {
+            throw new IllegalStateException("data has not sent to peer!");
+        }
         if (endStream) {
             state.setEndStreamSend();
         }
-        doOnData(data, endStream);
     }
 
     @Override
-    public void onReset(Http2Error http2Error) {
+    public void onError(GrpcStatus status) {
+        if (!state.allowSendReset()) {
+            throw new IllegalStateException("Duplicated rst!");
+        }
         state.setResetSend();
-        doOnReset(http2Error);
+        doOnError(status);
     }
 
     @Override
     public void onComplete() {
+        if (!state.allowSendEndStream()) {
+            throw new IllegalStateException("Stream already closed!");
+        }
         state.setEndStreamSend();
         doOnComplete();
     }
@@ -58,7 +83,7 @@ public abstract class AbstractChannelTransportObserver implements TransportObser
 
     protected abstract void doOnData(byte[] data, boolean endStream);
 
-    protected abstract void doOnReset(Http2Error http2Error);
+    protected abstract void doOnError(GrpcStatus status);
 
     protected abstract void doOnComplete();
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
similarity index 88%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
index 0e09ad4..4f74ec9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerOutboundTransportObserver.java
@@ -30,22 +30,18 @@ import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
 import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 
-public class ServerTransportObserver extends AbstractChannelTransportObserver {
+public class ServerOutboundTransportObserver extends OutboundTransportObserver {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ServerTransportObserver.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerOutboundTransportObserver.class);
 
     private final ChannelHandlerContext ctx;
 
-    public ServerTransportObserver(ChannelHandlerContext ctx) {
+    public ServerOutboundTransportObserver(ChannelHandlerContext ctx) {
         this.ctx = ctx;
     }
 
     public void onMetadata(Http2Headers headers, boolean endStream) {
-        if (endStream) {
-            state.setEndStreamSend();
-        } else {
-            state.setMetaSend();
-        }
+        checkSendMeta(headers, endStream);
         ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
             .addListener(future -> {
                 if (!future.isSuccess()) {
@@ -81,8 +77,8 @@ public class ServerTransportObserver extends AbstractChannelTransportObserver {
     }
 
     @Override
-    protected void doOnReset(Http2Error http2Error) {
-        ctx.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+    protected void doOnError(GrpcStatus status) {
+        ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.CANCEL))
             .addListener(future -> {
                 if (!future.isSuccess()) {
                     LOGGER.warn("write reset error", future.cause());
@@ -101,9 +97,7 @@ public class ServerTransportObserver extends AbstractChannelTransportObserver {
     }
 
     public void onData(ByteBuf buf, boolean endStream) {
-        if (endStream) {
-            state.setEndStreamSend();
-        }
+        checkSendData(endStream);
         ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
             .addListener(future -> {
                 if (!future.isSuccess()) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 86a811d..5623b33 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -35,8 +35,8 @@ public class ServerStream extends AbstractServerStream implements Stream {
     }
 
     @Override
-    protected TransportObserver createTransportObserver() {
-        return new StreamTransportObserver();
+    protected InboundTransportObserver createInboundTransportObserver() {
+        return new ServerStreamInboundTransportObserver();
     }
 
     private class ServerStreamObserverImpl implements ServerStreamObserver<Object> {
@@ -44,14 +44,14 @@ public class ServerStream extends AbstractServerStream implements Stream {
         @Override
         public void onNext(Object data) {
             if (getState().allowSendMeta()) {
-                getTransportSubscriber().onMetadata(createResponseMeta(), false);
+                outboundTransportObserver().onMetadata(createResponseMeta(), false);
             }
             final byte[] bytes = encodeResponse(data);
             if (bytes == null) {
                 return;
             }
             if (getState().allowSendData()) {
-                getTransportSubscriber().onData(bytes, false);
+                outboundTransportObserver().onData(bytes, false);
             }
         }
 
@@ -71,7 +71,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
             if (!getState().allowSendEndStream()) {
                 return;
             }
-            getTransportSubscriber().onMetadata(TripleConstant.SUCCESS_RESPONSE_META, true);
+            outboundTransportObserver().onMetadata(TripleConstant.SUCCESS_RESPONSE_META, true);
         }
 
         @Override
@@ -87,7 +87,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
         }
     }
 
-    private class StreamTransportObserver extends AbstractTransportObserver implements TransportObserver {
+    private class ServerStreamInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
 
         /**
          * for server stream the method only save header
@@ -114,7 +114,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
                 try {
                     RpcContext.restoreCancellationContext(getCancellationContext());
                     final RpcInvocation inv = buildInvocation(metadata);
-                    inv.setArguments(new Object[]{asStreamObserver()});
+                    inv.setArguments(new Object[]{inboundMessageObserver()});
                     final Result result = getInvoker().invoke(inv);
                     if (result.hasException()) {
                         transportError(GrpcStatus.getStatus(result.getException()));
@@ -137,7 +137,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
         public void onData(byte[] in, boolean endStream) {
             execute(() -> {
                 try {
-                    if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
+                    if (getMethodDescriptor().isServerStream()) {
                         serverStreamOnData(in);
                         return;
                     }
@@ -151,12 +151,19 @@ public class ServerStream extends AbstractServerStream implements Stream {
         }
 
         /**
+         * This method should not be called for a while
+         */
+        @Override
+        public void onError(GrpcStatus status) {
+        }
+
+        /**
          * call observer onNext
          */
         private void biStreamOnData(byte[] in) {
             final Object[] arguments = deserializeRequest(in);
             if (arguments != null) {
-                getStreamSubscriber().onNext(arguments[0]);
+                outboundMessageSubscriber().onNext(arguments[0]);
             }
         }
 
@@ -177,7 +184,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
                 RpcInvocation inv = buildInvocation(getHeaders());
                 final Object[] arguments = deserializeRequest(in);
                 if (arguments != null) {
-                    inv.setArguments(new Object[]{arguments[0], asStreamObserver()});
+                    inv.setArguments(new Object[]{arguments[0], inboundMessageObserver()});
                     final Result result = getInvoker().invoke(inv);
                     if (result.hasException()) {
                         transportError(GrpcStatus.getStatus(result.getException()));
@@ -195,12 +202,10 @@ public class ServerStream extends AbstractServerStream implements Stream {
          */
         @Override
         public void onComplete() {
-            if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
+            if (getMethodDescriptor().isServerStream()) {
                 return;
             }
-            execute(() -> {
-                getStreamSubscriber().onCompleted();
-            });
+            execute(() -> outboundMessageSubscriber().onCompleted());
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
index 358cc33..890c299 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
@@ -21,10 +21,10 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
 
 /**
- * Stream acts as a bi-directional intermediate layer for streaming data processing. It serializes object instance to
- * byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #asTransportObserver()}
- * and {@link #subscribe(AbstractChannelTransportObserver)} provide {@link TransportObserver} to send or receive remote data.
- * {@link #asStreamObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
+ * Stream acts as a bi-directional intermediate layer for processing streaming data . It serializes object instance to
+ * byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #inboundTransportObserver()}
+ * and {@link #subscribe(OutboundTransportObserver)} provide {@link TransportObserver} to receive or send remote data.
+ * {@link #inboundMessageObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
  * as API for users fetching/emitting objects from/to remote peer.
  */
 public interface Stream {
@@ -36,28 +36,28 @@ public interface Stream {
      *
      * @param observer receives remote byte[] data
      */
-    void subscribe(AbstractChannelTransportObserver observer);
+    void subscribe(OutboundTransportObserver observer);
 
     /**
      * Get a downstream data observer for writing byte[] data to this stream
      *
      * @return an observer for writing byte[] to remote peer
      */
-    TransportObserver asTransportObserver();
+    TransportObserver inboundTransportObserver();
 
     /**
-     * Register an upstream data observer to receive byte[] sent by this stream
+     * Register an upstream data observer to receive instance sent by this stream
      *
-     * @param observer receives remote byte[] data
+     * @param outboundMessageObserver receives remote byte[] data
      */
-    void subscribe(StreamObserver<Object> observer);
+    void subscribe(StreamObserver<Object> outboundMessageObserver);
 
     /**
      * Get a downstream data observer for transmitting instances to application code
      *
      * @return an observer for writing byte[] to remote peer
      */
-    StreamObserver<Object> asStreamObserver();
+    StreamObserver<Object> inboundMessageObserver();
 
     /**
      * Execute a task in stream's executor
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 7e5bc8f..5beebb7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -17,16 +17,40 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.handler.codec.http2.Http2Error;
-
+/**
+ * An observer used for transport messaging which provides full streaming support.
+ * A TransportObserver receives raw data or control messages from local/remote.
+ * Implementations should prefer to extend {@link OutboundTransportObserver} and {@link InboundTransportObserver}
+ * instead of this interface.
+ */
 public interface TransportObserver {
 
+    /**
+     * Transport metadata
+     *
+     * @param metadata  metadata KV paris
+     * @param endStream whether this data should terminate the stream
+     */
     void onMetadata(Metadata metadata, boolean endStream);
 
+    /**
+     * Transport data
+     *
+     * @param data      raw byte array
+     * @param endStream whether this data should terminate the stream
+     */
     void onData(byte[] data, boolean endStream);
 
-    void onReset(Http2Error http2Error);
+    /**
+     * Error
+     *
+     * @param status error status
+     */
+    void onError(GrpcStatus status);
 
+    /**
+     * Set stream completed
+     */
     void onComplete();
 
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
index 4160fd6..4f922d3 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
@@ -17,25 +17,21 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
+/**
+ * A state for recording stream
+ * A normal state transition :
+ * Meta -> (EndStream) -> Data -> (EndStream) -> (Rst)
+ */
 public class TransportState {
 
-    private volatile int state = 0;
     private static final int META_SEND = 0b00000000000000000000000000000001;
     private static final int RESET_SEND = 0b00000000000000000000000000000010;
     private static final int END_STREAM_SEND = 0b00000000000000000000000000000100;
     private static final int SERVER_SEND_STREAM_RECEIVED = 0b00000000000000000000000000001000;
-
     private static final int ALLOW_META_SEND = 0b00000000000000000000000000000000;
     private static final int ALLOW_DATA_SEND = META_SEND;
     private static final int ALLOW_END_STREAM_SEND = META_SEND;
-    private static final int ALLOW_RESET_SEND = 0b00000000000000000000000000000001;
-
-    public TransportState() {
-    }
-
-    public void setState(int state) {
-        this.state = state;
-    }
+    private volatile int state = 0;
 
     public void setMetaSend() {
         this.state = this.state | META_SEND;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index 377b093..007f235 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -26,7 +26,7 @@ public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
 
         final byte[] data = (byte[]) msg;
         if (clientStream != null) {
-            clientStream.asTransportObserver()
+            clientStream.inboundTransportObserver()
                 .onData(data, false);
         }
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index f215c27..ba2da9a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -45,7 +45,6 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
     }
 
     private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
-        DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
         Connection connection = Connection.getConnectionFromChannel(ctx.channel());
         final AbstractClientStream stream = AbstractClientStream.newClientStream(req, connection);
         final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index 0a35d2b..eb14bee 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2GoAwayFrame;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2HeadersFrame;
@@ -32,7 +31,7 @@ import io.netty.handler.codec.http2.Http2StreamFrame;
 import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 
 public final class TripleHttp2ClientResponseHandler extends SimpleChannelInboundHandler<Http2StreamFrame> {
-    private static final Logger logger = LoggerFactory.getLogger(TripleHttp2ClientResponseHandler.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(TripleHttp2ClientResponseHandler.class);
 
     public TripleHttp2ClientResponseHandler() {
         super(false);
@@ -44,7 +43,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
         if (evt instanceof Http2GoAwayFrame) {
             Http2GoAwayFrame event = (Http2GoAwayFrame) evt;
             ctx.close();
-            logger.debug(
+            LOGGER.debug(
                 "Event triggered, event name is: " + event.name() + ", last stream id is: " + event.lastStreamId());
         } else if (evt instanceof Http2ResetFrame) {
             onResetRead(ctx, (Http2ResetFrame) evt);
@@ -64,7 +63,8 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
 
     private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame) {
         final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
-        clientStream.cancelByRemote(Http2Error.valueOf(resetFrame.errorCode()));
+        LOGGER.warn("Triple Client received remote reset errorCode=" + resetFrame.errorCode());
+        clientStream.cancelByRemote();
         ctx.close();
     }
 
@@ -85,7 +85,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
                 }
             }
         }
-        final TransportObserver observer = clientStream.asTransportObserver();
+        final TransportObserver observer = clientStream.inboundTransportObserver();
         observer.onMetadata(new Http2HeaderMeta(headers), false);
         if (msg.isEndStream()) {
             observer.onComplete();
@@ -100,8 +100,8 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
         Metadata metadata = new DefaultMetadata();
         metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(status.code.code));
         metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
-        logger.warn("Meet Exception on ClientResponseHandler, status code is: " + status.code, cause);
-        clientStream.asStreamObserver().onError(status.asException());
+        LOGGER.warn("Meet Exception on ClientResponseHandler, status code is: " + status.code, cause);
+        clientStream.inboundMessageObserver().onError(status.asException());
         ctx.close();
     }
 
@@ -111,7 +111,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
             final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
             // stream already closed;
             if (clientStream != null) {
-                clientStream.asTransportObserver().onComplete();
+                clientStream.inboundTransportObserver().onComplete();
             }
         }
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 46207bb..8968d0d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -38,12 +38,11 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
 import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Frame;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2HeadersFrame;
 import io.netty.handler.codec.http2.Http2ResetFrame;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 
 import java.util.List;
 
@@ -66,11 +65,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             onHeadersRead(ctx, (Http2HeadersFrame) msg);
         } else if (msg instanceof Http2DataFrame) {
             onDataRead(ctx, (Http2DataFrame) msg);
-        } else if (msg instanceof Http2Frame) {
+        } else if (msg instanceof ReferenceCounted) {
             // ignored
             ReferenceCountUtil.release(msg);
-        } else {
-            super.channelRead(ctx, msg);
         }
     }
 
@@ -84,9 +81,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
     }
 
     public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
-        Http2Error http2Error = Http2Error.valueOf(frame.errorCode());
         final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
-        serverStream.cancelByRemote(http2Error);
+        LOGGER.warn("Triple Server received remote reset errorCode=" + frame.errorCode());
+        serverStream.cancelByRemote();
         ctx.close();
     }
 
@@ -106,7 +103,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         if (msg.isEndStream()) {
             final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
             if (serverStream != null) {
-                serverStream.asTransportObserver().onComplete();
+                serverStream.inboundTransportObserver().onComplete();
             }
         }
     }
@@ -126,7 +123,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
     public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
         final Http2Headers headers = msg.headers();
-        ServerTransportObserver transportObserver = new ServerTransportObserver(ctx);
+        ServerOutboundTransportObserver transportObserver = new ServerOutboundTransportObserver(ctx);
 
         if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
             responsePlainTextError(transportObserver, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
@@ -243,7 +240,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             stream.methods(methodDescriptors);
         }
 
-        final TransportObserver observer = stream.asTransportObserver();
+        final TransportObserver observer = stream.inboundTransportObserver();
         observer.onMetadata(new Http2HeaderMeta(headers), false);
         if (msg.isEndStream()) {
             observer.onComplete();
@@ -261,7 +258,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
     }
 
-    private void responsePlainTextError(ServerTransportObserver observer, int code, GrpcStatus status) {
+    private void responsePlainTextError(ServerOutboundTransportObserver observer, int code, GrpcStatus status) {
         Http2Headers headers = new DefaultHttp2Headers(true)
             .status(String.valueOf(code))
             .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
@@ -271,7 +268,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         observer.onData(status.description, true);
     }
 
-    private void responseErr(ServerTransportObserver observer, GrpcStatus status) {
+    private void responseErr(ServerOutboundTransportObserver observer, GrpcStatus status) {
         Http2Headers trailers = new DefaultHttp2Headers()
             .status(OK.codeAsText())
             .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 940a824..d2b124a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -16,7 +16,6 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import grpc.health.v1.HealthCheckResponse;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
@@ -33,6 +32,8 @@ import org.apache.dubbo.rpc.protocol.AbstractExporter;
 import org.apache.dubbo.rpc.protocol.AbstractProtocol;
 import org.apache.dubbo.rpc.protocol.tri.service.TriBuiltinService;
 
+import grpc.health.v1.HealthCheckResponse;
+
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
 import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
index a3156f3..a4d7787 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
@@ -26,10 +26,25 @@ import io.netty.handler.codec.http2.Http2GoAwayFrame;
 import io.netty.handler.codec.http2.Http2PingFrame;
 import io.netty.util.ReferenceCountUtil;
 
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.HashSet;
+import java.util.Set;
+
 import static org.apache.dubbo.rpc.protocol.tri.GracefulShutdown.GRACEFUL_SHUTDOWN_PING;
 
 public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
     private static final Logger logger = LoggerFactory.getLogger(TripleServerConnectionHandler.class);
+    // Some exceptions are not very useful and add too much noise to the log
+    private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
+    private static final Set<Class<?>> QUIET_EXCEPTIONS_CLASS = new HashSet<>();
+
+    static {
+        QUIET_EXCEPTIONS.add("NativeIoException");
+        QUIET_EXCEPTIONS_CLASS.add(IOException.class);
+        QUIET_EXCEPTIONS_CLASS.add(SocketException.class);
+    }
+
     private GracefulShutdown gracefulShutdown;
 
     @Override
@@ -50,6 +65,13 @@ public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
         }
     }
 
+    private boolean isQuiteException(Throwable t) {
+        if (QUIET_EXCEPTIONS_CLASS.contains(t.getClass())) {
+            return true;
+        }
+        return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
+    }
+
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         super.userEventTriggered(ctx, evt);
@@ -58,7 +80,7 @@ public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         // this may be change in future follow https://github.com/apache/dubbo/pull/8644
-        if (TripleUtil.isQuiteException(cause)) {
+        if (isQuiteException(cause)) {
             if (logger.isDebugEnabled()) {
                 logger.debug(String.format("Channel:%s Error", ctx.channel()), cause);
             }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 6728b09..7046bf2 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -25,7 +25,7 @@ public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
         final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
         final byte[] data = (byte[]) msg;
         if (serverStream != null) {
-            serverStream.asTransportObserver()
+            serverStream.inboundTransportObserver()
                 .onData(data, false);
         }
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
deleted file mode 100644
index 2fe6687..0000000
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.serialize.MultipleSerialization;
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.triple.TripleWrapper;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.ErrorInfo;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketException;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TripleUtil {
-    // Some exceptions are not very useful and add too much noise to the log
-    private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
-    private static final Set<Class<?>> QUIET_EXCEPTIONS_CLASS = new HashSet<>();
-    private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
-    private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder().withoutPadding();
-
-    static {
-        QUIET_EXCEPTIONS.add("NativeIoException");
-        QUIET_EXCEPTIONS_CLASS.add(IOException.class);
-        QUIET_EXCEPTIONS_CLASS.add(SocketException.class);
-    }
-
-    public static boolean isQuiteException(Throwable t) {
-        if (QUIET_EXCEPTIONS_CLASS.contains(t.getClass())) {
-            return true;
-        }
-        return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
-    }
-
-    public static Object unwrapResp(URL url, TripleWrapper.TripleResponseWrapper wrap,
-                                    MultipleSerialization serialization) {
-        String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
-        try {
-            final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
-            final Object ret = serialization.deserialize(url, serializeType, wrap.getType(), bais);
-            bais.close();
-            return ret;
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to unwrap resp", e);
-        }
-    }
-
-    public static Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
-        Map<Class<?>, Object> map = new HashMap<>();
-        try {
-            for (Any any : detailList) {
-                if (any.is(ErrorInfo.class)) {
-                    ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
-                    map.putIfAbsent(ErrorInfo.class, errorInfo);
-                } else if (any.is(DebugInfo.class)) {
-                    DebugInfo debugInfo = any.unpack(DebugInfo.class);
-                    map.putIfAbsent(DebugInfo.class, debugInfo);
-                }
-                // support others type but now only support this
-            }
-        } catch (InvalidProtocolBufferException e) {
-            e.printStackTrace();
-        }
-        return map;
-    }
-
-    public static Object[] unwrapReq(URL url, TripleWrapper.TripleRequestWrapper wrap,
-                                     MultipleSerialization multipleSerialization) {
-        String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
-        try {
-            Object[] arguments = new Object[wrap.getArgsCount()];
-            for (int i = 0; i < arguments.length; i++) {
-                final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getArgs(i).toByteArray());
-                Object obj = multipleSerialization.deserialize(url,
-                    serializeType, wrap.getArgTypes(i), bais);
-                arguments[i] = obj;
-            }
-            return arguments;
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to unwrap req: " + e.getMessage(), e);
-        }
-    }
-
-    public static TripleWrapper.TripleResponseWrapper wrapResp(URL url, String serializeType, Object resp,
-                                                               MethodDescriptor desc,
-                                                               MultipleSerialization multipleSerialization) {
-        try {
-            final TripleWrapper.TripleResponseWrapper.Builder builder = TripleWrapper.TripleResponseWrapper.newBuilder()
-                .setType(desc.getReturnClass().getName())
-                .setSerializeType(convertHessianToWrapper(serializeType));
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            multipleSerialization.serialize(url, serializeType, desc.getReturnClass().getName(), resp, bos);
-            builder.setData(ByteString.copyFrom(bos.toByteArray()));
-            bos.close();
-            return builder.build();
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to pack wrapper req", e);
-        }
-    }
-
-
-    public static TripleWrapper.TripleRequestWrapper wrapReq(URL url, String serializeType, Object req,
-                                                             String type,
-                                                             MultipleSerialization multipleSerialization) {
-        try {
-            final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
-                .addArgTypes(type)
-                .setSerializeType(convertHessianToWrapper(serializeType));
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            multipleSerialization.serialize(url, serializeType, type, req, bos);
-            builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
-            bos.close();
-            return builder.build();
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to pack wrapper req", e);
-        }
-    }
-
-    public static TripleWrapper.TripleRequestWrapper wrapReq(URL url, RpcInvocation invocation,
-                                                             MultipleSerialization serialization) {
-        try {
-            String serializationName = (String) invocation.getObjectAttachment(Constants.SERIALIZATION_KEY);
-            final TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder()
-                .setSerializeType(convertHessianToWrapper(serializationName));
-            for (int i = 0; i < invocation.getArguments().length; i++) {
-                final String clz = invocation.getParameterTypes()[i].getName();
-                builder.addArgTypes(clz);
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                serialization.serialize(url, serializationName, clz, invocation.getArguments()[i], bos);
-                builder.addArgs(ByteString.copyFrom(bos.toByteArray()));
-            }
-            return builder.build();
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to pack wrapper req", e);
-        }
-    }
-
-    public static <T> T unpack(byte[] data, Class<T> clz) {
-        return unpack(new ByteArrayInputStream(data), clz);
-    }
-
-    public static <T> T unpack(InputStream is, Class<T> clz) {
-        try {
-            final T req = SingleProtobufUtils.deserialize(is, clz);
-            is.close();
-            return req;
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to unpack req", e);
-        } finally {
-            closeQuietly(is);
-        }
-    }
-
-    private static void closeQuietly(Closeable c) {
-        if (c != null) {
-            try {
-                c.close();
-            } catch (IOException ignore) {
-                // ignored
-            }
-        }
-    }
-
-    public static byte[] pack(Object obj) {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try {
-            SingleProtobufUtils.serialize(obj, baos);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to pack protobuf object", e);
-        }
-        return baos.toByteArray();
-    }
-
-    public static String encodeWrapper(URL url, Object obj, String serializeType, MultipleSerialization serialization)
-        throws IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        serialization.serialize(url, serializeType, obj.getClass().getName(), obj, bos);
-        final TripleWrapper.TripleRequestWrapper wrap = TripleWrapper.TripleRequestWrapper.newBuilder()
-            .setSerializeType(convertHessianToWrapper(serializeType))
-            .addArgTypes(obj.getClass().getName())
-            .addArgs(ByteString.copyFrom(bos.toByteArray()))
-            .build();
-        return encodeBase64ASCII(wrap.toByteArray());
-    }
-
-    public static String encodeBase64ASCII(byte[] in) {
-        byte[] bytes = encodeBase64(in);
-        return new String(bytes, StandardCharsets.US_ASCII);
-    }
-
-    public static byte[] encodeBase64(byte[] in) {
-        return BASE64_ENCODER.encode(in);
-    }
-
-    public static Object decodeObjFromHeader(URL url, CharSequence value, MultipleSerialization serialization)
-        throws InvalidProtocolBufferException {
-        final byte[] decode = decodeASCIIByte(value);
-        final TripleWrapper.TripleRequestWrapper wrapper = TripleWrapper.TripleRequestWrapper.parseFrom(decode);
-        final Object[] objects = TripleUtil.unwrapReq(url, wrapper, serialization);
-        return objects[0];
-    }
-
-    public static byte[] decodeASCIIByte(CharSequence value) {
-        return BASE64_DECODER.decode(value.toString().getBytes(StandardCharsets.US_ASCII));
-    }
-
-    public static String convertHessianToWrapper(String serializeType) {
-        if (TripleConstant.HESSIAN2.equals(serializeType)) {
-            return TripleConstant.HESSIAN4;
-        }
-        return serializeType;
-    }
-
-    public static String convertHessianFromWrapper(String serializeType) {
-        if (TripleConstant.HESSIAN4.equals(serializeType)) {
-            return TripleConstant.HESSIAN2;
-        }
-        return serializeType;
-    }
-
-}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index dc98859..774b98a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -24,9 +24,12 @@ import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.RpcException;
 
 import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.rpc.DebugInfo;
+import com.google.rpc.ErrorInfo;
 import com.google.rpc.Status;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -38,41 +41,67 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
 
     @Override
     protected void doOnStartCall() {
-        asStreamObserver().onNext(getRpcInvocation());
-        asStreamObserver().onCompleted();
+        inboundMessageObserver().onNext(getRpcInvocation());
+        inboundMessageObserver().onCompleted();
     }
 
     @Override
-    protected TransportObserver createTransportObserver() {
-        return new UnaryClientTransportObserver();
+    protected InboundTransportObserver createInboundTransportObserver() {
+        return new ClientUnaryInboundTransportObserver();
     }
 
-    private class UnaryClientTransportObserver extends UnaryTransportObserver implements TransportObserver {
+    private Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
+        Map<Class<?>, Object> map = new HashMap<>();
+        try {
+            for (Any any : detailList) {
+                if (any.is(ErrorInfo.class)) {
+                    ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
+                    map.putIfAbsent(ErrorInfo.class, errorInfo);
+                } else if (any.is(DebugInfo.class)) {
+                    DebugInfo debugInfo = any.unpack(DebugInfo.class);
+                    map.putIfAbsent(DebugInfo.class, debugInfo);
+                }
+                // support others type but now only support this
+            }
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+        }
+        return map;
+    }
+
+    private class ClientUnaryInboundTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
 
         @Override
-        public void doOnComplete() {
-            try {
-                AppResponse result;
-                if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
-                    final Object resp = deserializeResponse(getData());
-                    result = new AppResponse(resp);
+        public void onComplete() {
+            execute(() -> {
+                final GrpcStatus status = extractStatusFromMeta(getHeaders());
+                if (GrpcStatus.Code.isOk(status.code.code)) {
+                    try {
+                        AppResponse result;
+                        if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
+                            final Object resp = deserializeResponse(getData());
+                            result = new AppResponse(resp);
+                        } else {
+                            result = new AppResponse();
+                        }
+                        Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+                        result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
+                        response.setResult(result);
+                        DefaultFuture2.received(getConnection(), response);
+                    } catch (Exception e) {
+                        final GrpcStatus clientStatus = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                            .withCause(e)
+                            .withDescription("Failed to deserialize response");
+                        onError(clientStatus);
+                    }
                 } else {
-                    result = new AppResponse();
+                    onError(status);
                 }
-                Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
-                result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
-                response.setResult(result);
-                DefaultFuture2.received(getConnection(), response);
-            } catch (Exception e) {
-                final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withCause(e)
-                    .withDescription("Failed to deserialize response");
-                onError(status);
-            }
+            });
         }
 
         @Override
-        protected void onError(GrpcStatus status) {
+        public void onError(GrpcStatus status) {
             Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
             response.setErrorMessage(status.description);
             final AppResponse result = new AppResponse();
@@ -96,12 +125,12 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
                 return null;
             }
             final CharSequence raw = metadata.get(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader());
-            byte[] statusDetailBin = TripleUtil.decodeASCIIByte(raw);
+            byte[] statusDetailBin = decodeASCIIByte(raw);
             ClassLoader tccl = Thread.currentThread().getContextClassLoader();
             try {
-                final Status statusDetail = TripleUtil.unpack(statusDetailBin, Status.class);
+                final Status statusDetail = unpack(statusDetailBin, Status.class);
                 List<Any> detailList = statusDetail.getDetailsList();
-                Map<Class<?>, Object> classObjectMap = TripleUtil.tranFromStatusDetails(detailList);
+                Map<Class<?>, Object> classObjectMap = tranFromStatusDetails(detailList);
 
                 // get common exception from DebugInfo
                 DebugInfo debugInfo = (DebugInfo) classObjectMap.get(DebugInfo.class);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
similarity index 62%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
index 377b093..8e9ed66 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
@@ -14,20 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+abstract class UnaryInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
+    protected static final String DUPLICATED_DATA = "Duplicated data";
 
-public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
+    private byte[] data;
 
-        final byte[] data = (byte[]) msg;
-        if (clientStream != null) {
-            clientStream.asTransportObserver()
-                .onData(data, false);
+    public byte[] getData() {
+        return data;
+    }
+
+    @Override
+    public void onData(byte[] in, boolean endStream) {
+        if (data == null) {
+            this.data = in;
+        } else {
+            onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                .withDescription(DUPLICATED_DATA));
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 8c3d98b..663ef48 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -41,18 +41,18 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
     }
 
     @Override
-    protected TransportObserver createTransportObserver() {
+    protected InboundTransportObserver createInboundTransportObserver() {
         return new UnaryServerTransportObserver();
     }
 
-    private class UnaryServerTransportObserver extends UnaryTransportObserver implements TransportObserver {
+    private class UnaryServerTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
         @Override
-        protected void onError(GrpcStatus status) {
+        public void onError(GrpcStatus status) {
             transportError(status);
         }
 
         @Override
-        public void doOnComplete() {
+        public void onComplete() {
             if (getData() != null) {
                 invoke();
             } else {
@@ -82,15 +82,15 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
                     return;
                 }
                 Metadata metadata = createResponseMeta();
-                getTransportSubscriber().onMetadata(metadata, false);
+                outboundTransportObserver().onMetadata(metadata, false);
                 final byte[] data = encodeResponse(response.getValue());
                 if (data == null) {
                     return;
                 }
-                getTransportSubscriber().onData(data, false);
+                outboundTransportObserver().onData(data, false);
                 Metadata trailers = TripleConstant.SUCCESS_RESPONSE_META;
                 convertAttachment(trailers, response.getObjectAttachments());
-                getTransportSubscriber().onMetadata(trailers, true);
+                outboundTransportObserver().onMetadata(trailers, true);
             });
             RpcContext.removeContext();
         }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
index ce6d7be..78a0038 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
@@ -47,6 +47,12 @@ class TransportStateTest {
 
         transportState = new TransportState();
         Assertions.assertTrue(transportState.allowSendMeta());
+
+        transportState = new TransportState();
+        if (transportState.allowSendMeta()) {
+            transportState.setMetaSend();
+        }
+        Assertions.assertFalse(transportState.allowSendMeta());
     }
 
     @Test
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 0cddc3c..0485e15 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -36,7 +36,7 @@ class UnaryClientStreamTest {
     public void testInit() {
         URL url = new ServiceConfigURL("test", "1.2.3.4", 8080);
         final UnaryClientStream stream = UnaryClientStream.unary(url);
-        final StreamObserver<Object> observer = stream.asStreamObserver();
+        final StreamObserver<Object> observer = stream.inboundMessageObserver();
         RpcInvocation inv = Mockito.mock(RpcInvocation.class);
         when(inv.getModuleModel()).thenReturn(ApplicationModel.defaultModel().getDefaultModule());
         // no invoker
@@ -44,7 +44,7 @@ class UnaryClientStreamTest {
         // no subscriber
         Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
 
-        AbstractChannelTransportObserver transportObserver = Mockito.mock(AbstractChannelTransportObserver.class);
+        OutboundTransportObserver transportObserver = Mockito.mock(OutboundTransportObserver.class);
         stream.subscribe(transportObserver);
         // no method descriptor
         Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));