You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/21 06:23:46 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] Refactor transport state and other code (#9057)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new bf3548d  [3.0-Triple] Refactor transport state and other code (#9057)
bf3548d is described below

commit bf3548d50162aff07b3dcc216174a806646f174b
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Oct 21 14:23:32 2021 +0800

    [3.0-Triple] Refactor transport state and other code (#9057)
    
    * refactor(tri): refactor transport state and other code
    
    * fix rat
    
    * remove unused code
    
    * refactor stream call
    
    * fix style
    
    * refactor client stream obServer
    
    * fix ut
    
    * finish client refactor
    
    * refactor client construct the stream
    
    * fix ut
    
    * fix comment
    
    * avoid block
    
    * avoid block
    
    * fix ut
    
    * remove response error in util
    
    * Abstract client and server transport
    
    * fix comment
    
    * fix ut
    
    * Optimize interface
    
    * Optimize promise
    
    * Fix ut
    
    * remove unused code
    
    * Start call only when the channel creation is successful
    
    * start call switch user threads
    
    Co-authored-by: guohao <gu...@gmail.com>
---
 .../apache/dubbo/rpc/model/MethodDescriptor.java   |   4 +
 .../tri/AbstractChannelTransportObserver.java      |  76 +++++++
 .../rpc/protocol/tri/AbstractClientStream.java     | 252 ++++++++++++++++++---
 .../rpc/protocol/tri/AbstractServerStream.java     |  60 +++--
 .../dubbo/rpc/protocol/tri/AbstractStream.java     |  95 ++++----
 .../dubbo/rpc/protocol/tri/ClientStream.java       | 112 ++++-----
 .../rpc/protocol/tri/ClientTransportObserver.java  |  90 ++------
 .../apache/dubbo/rpc/protocol/tri/Compressor.java  |   2 +
 .../apache/dubbo/rpc/protocol/tri/GrpcStatus.java  |  38 +++-
 .../dubbo/rpc/protocol/tri/IdentityCompressor.java |   1 -
 .../dubbo/rpc/protocol/tri/ServerStream.java       |  35 ++-
 .../rpc/protocol/tri/ServerTransportObserver.java  |  80 ++++---
 .../org/apache/dubbo/rpc/protocol/tri/Stream.java  |   4 +-
 .../dubbo/rpc/protocol/tri/TransportObserver.java  |  13 +-
 .../dubbo/rpc/protocol/tri/TransportState.java     |  76 +++++++
 .../protocol/tri/TripleClientRequestHandler.java   | 115 ++--------
 .../dubbo/rpc/protocol/tri/TripleConstant.java     |  42 ++--
 .../tri/TripleHttp2FrameServerHandler.java         | 100 +++++---
 .../apache/dubbo/rpc/protocol/tri/TripleUtil.java  |  48 +---
 .../dubbo/rpc/protocol/tri/UnaryClientStream.java  |  93 +++-----
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  | 109 ++-------
 .../dubbo/rpc/protocol/tri/TransportStateTest.java | 116 ++++++++++
 .../rpc/protocol/tri/UnaryClientStreamTest.java    |   5 +-
 23 files changed, 954 insertions(+), 612 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index 91c9887..93148df 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -122,6 +122,10 @@ public class MethodDescriptor {
         return rpcType.equals(RpcType.SERVER_STREAM) || rpcType.equals(RpcType.BIDIRECTIONAL_STREAM) || rpcType.equals(RpcType.CLIENT_STREAM);
     }
 
+    public boolean isServerStream() {
+        return RpcType.SERVER_STREAM.equals(rpcType);
+    }
+
     public boolean isUnary() {
         return rpcType.equals(RpcType.UNARY);
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
new file mode 100644
index 0000000..47c24f3
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractChannelTransportObserver.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import io.netty.handler.codec.http2.Http2Error;
+
+public abstract class AbstractChannelTransportObserver implements TransportObserver {
+
+    protected final TransportState state = new TransportState();
+
+    @Override
+    public void onMetadata(Metadata metadata, boolean endStream) {
+        if (endStream) {
+            state.setEndStreamSend();
+        } else {
+            state.setMetaSend();
+        }
+        doOnMetadata(metadata, endStream);
+    }
+
+    @Override
+    public void onData(byte[] data, boolean endStream) {
+        if (endStream) {
+            state.setEndStreamSend();
+        }
+        doOnData(data, endStream);
+    }
+
+    @Override
+    public void onReset(Http2Error http2Error) {
+        state.setResetSend();
+        doOnReset(http2Error);
+    }
+
+    @Override
+    public void onComplete() {
+        state.setEndStreamSend();
+        doOnComplete();
+    }
+
+
+    protected abstract void doOnMetadata(Metadata metadata, boolean endStream);
+
+    protected abstract void doOnData(byte[] data, boolean endStream);
+
+    protected abstract void doOnReset(Http2Error http2Error);
+
+    protected abstract void doOnComplete();
+
+
+    protected int calcCompressFlag(Compressor compressor) {
+        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+            return 0;
+        }
+        return 1;
+    }
+
+}
+
+
+
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index b794edb..f7dab7e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -18,34 +18,62 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.api.Connection;
+import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
 import org.apache.dubbo.rpc.CancellationContext;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ConsumerModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.ServiceModel;
 import org.apache.dubbo.triple.TripleWrapper;
 
+import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.util.AsciiString;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 
+import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
 
 public abstract class AbstractClientStream extends AbstractStream implements Stream {
+
+    private final AsciiString scheme;
     private ConsumerModel consumerModel;
     private Connection connection;
+    private RpcInvocation rpcInvocation;
+    private long requestId;
 
     protected AbstractClientStream(URL url) {
         super(url);
+        this.scheme = getSchemeFromUrl(url);
+        // for client cancel,send rst frame to server
+        this.getCancellationContext().addListener(context -> {
+            Throwable throwable = this.getCancellationContext().getCancellationCause();
+            if (LOGGER.isWarnEnabled()) {
+                LOGGER.warn("Triple request to "
+                    + getConsumerModel().getServiceName() + "#" + getMethodName() +
+                    " was canceled by local exception ", throwable);
+            }
+            this.asTransportObserver().onReset(getHttp2Error(throwable));
+        });
     }
 
-    protected AbstractClientStream(URL url, Executor executor) {
-        super(url, executor);
-    }
 
     public static UnaryClientStream unary(URL url) {
         return new UnaryClientStream(url);
@@ -55,38 +83,114 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         return new ClientStream(url);
     }
 
-    public static AbstractClientStream newClientStream(URL url, boolean unary) {
-        AbstractClientStream stream = unary ? unary(url) : stream(url);
-        final CancellationContext cancellationContext = stream.getCancellationContext();
-        // for client cancel,send rst frame to server
-        cancellationContext.addListener(context -> {
-            if (LOGGER.isWarnEnabled()) {
-                Throwable throwable = cancellationContext.getCancellationCause();
-                LOGGER.warn("Cancel by local throwable is ", throwable);
+    public static AbstractClientStream newClientStream(Request req, Connection connection) {
+        final RpcInvocation inv = (RpcInvocation) req.getData();
+        final URL url = inv.getInvoker().getUrl();
+        ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
+        MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel, inv);
+        ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
+        AbstractClientStream stream = methodDescriptor.isUnary() ? unary(url) : stream(url);
+        Compressor compressor = getCompressor(url, consumerModel);
+        stream.request(req)
+            .service(consumerModel)
+            .connection(connection)
+            .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+            .method(methodDescriptor)
+            .setCompressor(compressor);
+        return stream;
+    }
+
+    protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
+        execute(() -> {
+            channel.pipeline()
+                .addLast(new TripleHttp2ClientResponseHandler())
+                .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
+                .addLast(new TripleClientInboundHandler());
+            channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(this);
+            final ClientTransportObserver clientTransportObserver = new ClientTransportObserver(channel, promise);
+            subscribe(clientTransportObserver);
+            try {
+                doOnStartCall();
+            } catch (Throwable throwable) {
+                cancel(throwable);
+                DefaultFuture2.getFuture(getRequestId()).cancel();
             }
-            stream.asTransportObserver().onReset(Http2Error.CANCEL);
         });
-        return stream;
     }
 
-    public AbstractClientStream service(ConsumerModel model) {
-        this.consumerModel = model;
-        return this;
+    protected abstract void doOnStartCall();
+
+    @Override
+    protected StreamObserver<Object> createStreamObserver() {
+        return new ClientStreamObserverImpl(getCancellationContext());
     }
 
-    public ConsumerModel getConsumerModel() {
-        return consumerModel;
+    protected class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+
+        public ClientStreamObserverImpl(CancellationContext cancellationContext) {
+            super(cancellationContext);
+        }
+
+        @Override
+        public void onNext(Object data) {
+            if (getState().allowSendMeta()) {
+                final Metadata metadata = createRequestMeta(getRpcInvocation());
+                getTransportSubscriber().onMetadata(metadata, false);
+            }
+            if (getState().allowSendData()) {
+                final byte[] bytes = encodeRequest(data);
+                getTransportSubscriber().onData(bytes, false);
+            }
+        }
+
+        /**
+         * Handle all exceptions in the request process, other procedures directly throw
+         * <p>
+         * other procedures is {@link ClientStreamObserver#onNext(Object)} and {@link ClientStreamObserver#onCompleted()}
+         */
+        @Override
+        public void onError(Throwable throwable) {
+            if (getState().allowSendEndStream()) {
+                GrpcStatus status = GrpcStatus.getStatus(throwable);
+                transportError(status, null, getState().allowSendMeta());
+            } else {
+                if (LOGGER.isErrorEnabled()) {
+                    LOGGER.error("Triple request to "
+                        + getConsumerModel().getServiceName() + "#" + getMethodName() +
+                        " was failed by exception ", throwable);
+                }
+            }
+        }
+
+        @Override
+        public void onCompleted() {
+            if (getState().allowSendEndStream()) {
+                getTransportSubscriber().onComplete();
+            }
+        }
+
+        @Override
+        public void setCompression(String compression) {
+            if (!getState().allowSendMeta()) {
+                cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+                return;
+            }
+            Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+            setCompressor(compressor);
+        }
     }
 
-    public AbstractClientStream connection(Connection connection) {
-        this.connection = connection;
-        return this;
+    @Override
+    protected void cancelByRemoteReset(Http2Error http2Error) {
+        DefaultFuture2.getFuture(getRequestId()).cancel();
     }
 
-    public Connection getConnection() {
-        return connection;
+    @Override
+    protected void cancelByLocal(Throwable throwable) {
+        getCancellationContext().cancel(throwable);
     }
 
+
     @Override
     public void execute(Runnable runnable) {
         try {
@@ -104,6 +208,59 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         }
     }
 
+    public AbstractClientStream service(ConsumerModel model) {
+        this.consumerModel = model;
+        return this;
+    }
+
+    public AbstractClientStream request(Request request) {
+        this.requestId = request.getId();
+        this.rpcInvocation = (RpcInvocation) request.getData();
+        return this;
+    }
+
+    protected RpcInvocation getRpcInvocation() {
+        return this.rpcInvocation;
+    }
+
+    public AsciiString getScheme() {
+        return scheme;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    private AsciiString getSchemeFromUrl(URL url) {
+        try {
+            Boolean ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY, Boolean.class);
+            if (ssl == null) {
+                return TripleConstant.HTTP_SCHEME;
+            }
+            return ssl ? TripleConstant.HTTPS_SCHEME : TripleConstant.HTTP_SCHEME;
+        } catch (Exception e) {
+            return TripleConstant.HTTP_SCHEME;
+        }
+    }
+
+    private Http2Error getHttp2Error(Throwable throwable) {
+        // todo Convert the exception to http2Error
+        return Http2Error.CANCEL;
+    }
+
+    public ConsumerModel getConsumerModel() {
+        return consumerModel;
+    }
+
+    public AbstractClientStream connection(Connection connection) {
+        this.connection = connection;
+        return this;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
     protected byte[] encodeRequest(Object value) {
         final byte[] out;
         final Object obj;
@@ -114,7 +271,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
             obj = getRequestValue(value);
         }
         out = TripleUtil.pack(obj);
-
         return super.compress(out);
     }
 
@@ -133,7 +289,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
             RpcInvocation invocation = (RpcInvocation) value;
             return invocation.getArguments()[0];
         }
-
         return value;
     }
 
@@ -162,11 +317,16 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
 
     protected Metadata createRequestMeta(RpcInvocation inv) {
         Metadata metadata = new DefaultMetadata();
-        metadata.put(TripleHeaderEnum.PATH_KEY.getHeader(), "/" + inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName())
-            .put(TripleHeaderEnum.AUTHORITY_KEY.getHeader(), getUrl().getAddress())
-            .put(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO)
+        // put http2 params
+        metadata.put(Http2Headers.PseudoHeaderName.SCHEME.value(), this.getScheme())
+            .put(Http2Headers.PseudoHeaderName.PATH.value(), getMethodPath(inv))
+            .put(Http2Headers.PseudoHeaderName.AUTHORITY.value(), getUrl().getAddress())
+            .put(Http2Headers.PseudoHeaderName.METHOD.value(), HttpMethod.POST.asciiName());
+
+        metadata.put(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO)
             .put(TripleHeaderEnum.TIMEOUT.getHeader(), inv.get(CommonConstants.TIMEOUT_KEY) + "m")
-            .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
+            .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS)
+        ;
 
         metadata.putIfNotNull(TripleHeaderEnum.SERVICE_VERSION.getHeader(), getUrl().getVersion())
             .putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
@@ -183,15 +343,33 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         return metadata;
     }
 
-    @Override
-    protected void cancelByRemoteReset(Http2Error http2Error) {
-        DefaultFuture2.getFuture(getRequest().getId()).cancel();
+    private String getMethodPath(RpcInvocation inv) {
+        return "/" + inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName();
     }
 
-    @Override
-    protected void cancelByLocal(Throwable throwable) {
-        getCancellationContext().cancel(throwable);
+    private static Compressor getCompressor(URL url, ServiceModel model) {
+        String compressorStr = url.getParameter(COMPRESSOR_KEY);
+        if (compressorStr == null) {
+            // Compressor can not be set by dynamic config
+            compressorStr = ConfigurationUtils
+                .getCachedDynamicProperty(model.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+        }
+        return Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
     }
 
-
+    /**
+     * Get the tri protocol special MethodDescriptor
+     */
+    private static MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
+        List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
+        if (CollectionUtils.isEmpty(methodDescriptors)) {
+            throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+        }
+        for (MethodDescriptor methodDescriptor : methodDescriptors) {
+            if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
+                return methodDescriptor;
+            }
+        }
+        throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 567f45a..f566641 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -31,7 +31,10 @@ import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.triple.TripleWrapper;
 
 import com.google.protobuf.Message;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
 
 import java.util.Arrays;
 import java.util.List;
@@ -45,9 +48,10 @@ import static org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KE
 public abstract class AbstractServerStream extends AbstractStream implements Stream {
 
     private final ProviderModel providerModel;
+    private final List<HeaderFilter> headerFilters;
+    private ServiceDescriptor serviceDescriptor;
     private List<MethodDescriptor> methodDescriptors;
     private Invoker<?> invoker;
-    private final List<HeaderFilter> headerFilters;
 
     protected AbstractServerStream(URL url) {
         this(url, lookupProviderModel(url));
@@ -102,6 +106,14 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
         return this;
     }
 
+    public ServiceDescriptor getServiceDescriptor() {
+        return serviceDescriptor;
+    }
+
+    public void setServiceDescriptor(ServiceDescriptor serviceDescriptor) {
+        this.serviceDescriptor = serviceDescriptor;
+    }
+
     public Invoker<?> getInvoker() {
         return invoker;
     }
@@ -134,7 +146,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
         ClassLoader tccl = Thread.currentThread().getContextClassLoader();
         try {
             if (getProviderModel() != null) {
-                ClassLoadUtil.switchContextLoader(getProviderModel().getClassLoader());
+                ClassLoadUtil.switchContextLoader(getProviderModel().getServiceInterfaceClass().getClassLoader());
             }
             if (getMethodDescriptor() == null || getMethodDescriptor().isNeedWrap()) {
                 final TripleWrapper.TripleRequestWrapper wrapper = TripleUtil.unpack(data,
@@ -160,16 +172,18 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
                         transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
                             .withDescription("Method :" + getMethodName() + "[" + Arrays.toString(paramTypes) + "] " +
                                 "not found of service:" + getServiceDescriptor().getServiceName()));
-
                         return null;
                     }
                 }
-
                 return TripleUtil.unwrapReq(getUrl(), wrapper, getMultipleSerialization());
             } else {
                 return new Object[]{TripleUtil.unpack(data, getMethodDescriptor().getParameterClasses()[0])};
             }
-
+        } catch (Throwable throwable) {
+            LOGGER.warn("Decode request failed:", throwable);
+            transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                .withDescription("Decode request failed:" + throwable.getMessage()));
+            return null;
         } finally {
             ClassLoadUtil.switchContextLoader(tccl);
         }
@@ -178,23 +192,39 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
     /**
      * create basic meta data
      */
-    protected Metadata createRequestMeta() {
+    protected Metadata createResponseMeta() {
         Metadata metadata = new DefaultMetadata();
+        metadata.put(Http2Headers.PseudoHeaderName.STATUS.value(), HttpResponseStatus.OK.codeAsText());
+        metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
         metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), super.getCompressor().getMessageEncoding())
-            .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
+            .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), getAcceptEncoding());
         return metadata;
     }
 
     protected byte[] encodeResponse(Object value) {
-        final com.google.protobuf.Message message;
-        if (getMethodDescriptor().isNeedWrap()) {
-            message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
-                getMultipleSerialization());
-        } else {
-            message = (Message) value;
+        final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+        try {
+            if (getProviderModel() != null) {
+                ClassLoadUtil.switchContextLoader(getProviderModel().getServiceInterfaceClass().getClassLoader());
+            }
+            final Message message;
+            if (getMethodDescriptor().isNeedWrap()) {
+                message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value, getMethodDescriptor(),
+                    getMultipleSerialization());
+            } else {
+                message = (Message) value;
+            }
+            byte[] out = TripleUtil.pack(message);
+            return super.compress(out);
+        } catch (Throwable throwable) {
+            LOGGER.error("Encode Response data error ", throwable);
+            transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
+                .withCause(throwable)
+                .withDescription("Encode Response data error"));
+            return null;
+        } finally {
+            ClassLoadUtil.switchContextLoader(tccl);
         }
-        byte[] out = TripleUtil.pack(message);
-        return super.compress(out);
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 1f4f19a..aaac6d8 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -25,11 +25,9 @@ import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.config.Constants;
-import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.rpc.CancellationContext;
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
 
 import com.google.protobuf.Any;
@@ -52,13 +50,15 @@ public abstract class AbstractStream implements Stream {
     private final TransportObserver transportObserver;
     private final Executor executor;
     private final CancellationContext cancellationContext;
-    private ServiceDescriptor serviceDescriptor;
+    // AcceptEncoding does not change after the application is started,
+    // so it can be obtained when constructing the stream
+    private final String acceptEncoding;
+
     private MethodDescriptor methodDescriptor;
     private String methodName;
-    private Request request;
     private String serializeType;
     private StreamObserver<Object> streamSubscriber;
-    private TransportObserver transportSubscriber;
+    private AbstractChannelTransportObserver transportSubscriber;
     private Compressor compressor = IdentityCompressor.NONE;
     private Compressor deCompressor = IdentityCompressor.NONE;
     private volatile boolean cancelled = false;
@@ -77,14 +77,7 @@ public abstract class AbstractStream implements Stream {
         this.cancellationContext = new CancellationContext();
         this.transportObserver = createTransportObserver();
         this.streamObserver = createStreamObserver();
-    }
-
-    public boolean isCancelled() {
-        return cancelled;
-    }
-
-    protected CancellationContext getCancellationContext() {
-        return cancellationContext;
+        this.acceptEncoding = Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel());
     }
 
     private Executor lookupExecutor(URL url, Executor executor) {
@@ -106,13 +99,20 @@ public abstract class AbstractStream implements Stream {
         return new SerializingExecutor(executor);
     }
 
-    public Request getRequest() {
-        return request;
+    public String getAcceptEncoding() {
+        return acceptEncoding;
     }
 
-    public AbstractStream request(Request request) {
-        this.request = request;
-        return this;
+    public TransportState getState() {
+        return transportSubscriber.state;
+    }
+
+    public boolean isCancelled() {
+        return cancelled;
+    }
+
+    protected CancellationContext getCancellationContext() {
+        return cancellationContext;
     }
 
     @Override
@@ -131,6 +131,7 @@ public abstract class AbstractStream implements Stream {
 
     public AbstractStream method(MethodDescriptor md) {
         this.methodDescriptor = md;
+        this.methodName = md.getMethodName();
         return this;
     }
 
@@ -163,8 +164,14 @@ public abstract class AbstractStream implements Stream {
 
     protected abstract void cancelByLocal(Throwable throwable);
 
+    /**
+     * create request StreamObserver
+     */
     protected abstract StreamObserver<Object> createStreamObserver();
 
+    /**
+     * create response TransportObserver
+     */
     protected abstract TransportObserver createTransportObserver();
 
     public String getSerializeType() {
@@ -172,8 +179,8 @@ public abstract class AbstractStream implements Stream {
     }
 
     public AbstractStream serialize(String serializeType) {
-        if ("hessian4".equals(serializeType)) {
-            serializeType = "hessian2";
+        if (TripleConstant.HESSIAN4.equals(serializeType)) {
+            serializeType = TripleConstant.HESSIAN2;
         }
         this.serializeType = serializeType;
         return this;
@@ -195,14 +202,6 @@ public abstract class AbstractStream implements Stream {
         return methodDescriptor;
     }
 
-    public ServiceDescriptor getServiceDescriptor() {
-        return serviceDescriptor;
-    }
-
-    public void setServiceDescriptor(ServiceDescriptor serviceDescriptor) {
-        this.serviceDescriptor = serviceDescriptor;
-    }
-
     public Compressor getCompressor() {
         return this.compressor;
     }
@@ -256,7 +255,7 @@ public abstract class AbstractStream implements Stream {
     }
 
     @Override
-    public void subscribe(TransportObserver observer) {
+    public void subscribe(AbstractChannelTransportObserver observer) {
         this.transportSubscriber = observer;
     }
 
@@ -284,7 +283,7 @@ public abstract class AbstractStream implements Stream {
         }
         getTransportSubscriber().onMetadata(trailers, true);
         if (LOGGER.isErrorEnabled()) {
-            LOGGER.error("[Triple-Server-Error] status=" + status.code.code + " service=" + getServiceDescriptor().getServiceName()
+            LOGGER.error("[Triple-Error] status=" + status.code.code
                 + " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
         }
     }
@@ -349,7 +348,7 @@ public abstract class AbstractStream implements Stream {
             if (TripleHeaderEnum.containsExcludeAttachments(key)) {
                 continue;
             }
-            if (key.endsWith("-bin") && key.length() > 4) {
+            if (key.endsWith(TripleConstant.GRPC_BIN_SUFFIX) && key.length() > 4) {
                 try {
                     attachments.put(key.substring(0, key.length() - 4), TripleUtil.decodeASCIIByte(header.getValue()));
                 } catch (Exception e) {
@@ -363,6 +362,9 @@ public abstract class AbstractStream implements Stream {
     }
 
     protected void convertAttachment(Metadata metadata, Map<String, Object> attachments) {
+        if (attachments == null) {
+            return;
+        }
         for (Map.Entry<String, Object> entry : attachments.entrySet()) {
             final String key = entry.getKey().toLowerCase(Locale.ROOT);
             if (Http2Headers.PseudoHeaderName.isPseudoHeader(key)) {
@@ -376,6 +378,13 @@ public abstract class AbstractStream implements Stream {
         }
     }
 
+    /**
+     * Convert each user's attach value to metadata
+     *
+     * @param metadata {@link Metadata}
+     * @param key      metadata key
+     * @param v        metadata value (Metadata Only string and byte arrays are allowed)
+     */
     private void convertSingleAttachment(Metadata metadata, String key, Object v) {
         try {
             if (v instanceof String) {
@@ -383,7 +392,7 @@ public abstract class AbstractStream implements Stream {
                 metadata.put(key, str);
             } else if (v instanceof byte[]) {
                 String str = TripleUtil.encodeBase64ASCII((byte[]) v);
-                metadata.put(key + "-bin", str);
+                metadata.put(key + TripleConstant.GRPC_BIN_SUFFIX, str);
             }
         } catch (Throwable t) {
             LOGGER.warn("Meet exception when convert single attachment key:" + key + " value=" + v, t);
@@ -412,7 +421,10 @@ public abstract class AbstractStream implements Stream {
 
         @Override
         public void onReset(Http2Error http2Error) {
-            getTransportSubscriber().onReset(http2Error);
+            if (getState().allowSendReset()) {
+                getState().setResetSend();
+                getTransportSubscriber().onReset(http2Error);
+            }
         }
 
         @Override
@@ -454,14 +466,19 @@ public abstract class AbstractStream implements Stream {
 
         @Override
         public void onComplete() {
-            final GrpcStatus status = extractStatusFromMeta(getHeaders());
-            if (Code.isOk(status.code.code)) {
-                doOnComplete();
-            } else {
-                onError(status);
-            }
+            execute(() -> {
+                final GrpcStatus status = extractStatusFromMeta(getHeaders());
+                if (Code.isOk(status.code.code)) {
+                    doOnComplete();
+                } else {
+                    onError(status);
+                }
+            });
         }
 
+        /**
+         * This method exception needs to be caught by the implementation class
+         */
         protected abstract void doOnComplete();
 
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 42e7089..b868d87 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -19,8 +19,10 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
+import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcInvocation;
 
 public class ClientStream extends AbstractClientStream implements Stream {
 
@@ -29,75 +31,81 @@ public class ClientStream extends AbstractClientStream implements Stream {
     }
 
     @Override
-    protected StreamObserver<Object> createStreamObserver() {
-        return new ClientStreamObserverImpl(getCancellationContext());
+    protected TransportObserver createTransportObserver() {
+        return new ClientTransportObserverImpl();
     }
 
     @Override
-    protected TransportObserver createTransportObserver() {
-        return new AbstractTransportObserver() {
-
-            @Override
-            public void onData(byte[] data, boolean endStream) {
-                execute(() -> {
-                    final Object resp = deserializeResponse(data);
-                    getStreamSubscriber().onNext(resp);
-                });
-            }
-
-            @Override
-            public void onComplete() {
-                execute(() -> {
-                    final GrpcStatus status = extractStatusFromMeta(getHeaders());
-
-                    if (GrpcStatus.Code.isOk(status.code.code)) {
-                        getStreamSubscriber().onCompleted();
-                    } else {
-                        getStreamSubscriber().onError(status.asException());
-                    }
-                });
-            }
-        };
+    protected void doOnStartCall() {
+        Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+        AppResponse result = getMethodDescriptor().isServerStream() ? callServerStream() : callBiStream();
+        response.setResult(result);
+        DefaultFuture2.received(getConnection(), response);
     }
 
-    private class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+    private AppResponse callServerStream() {
+        StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[1];
+        obServer = attachCancelContext(obServer, getCancellationContext());
+        subscribe(obServer);
+        asStreamObserver().onNext(getRpcInvocation().getArguments()[0]);
+        asStreamObserver().onCompleted();
+        return new AppResponse();
+    }
 
-        private boolean metaSent;
+    private AppResponse callBiStream() {
+        StreamObserver<Object> obServer = (StreamObserver<Object>) getRpcInvocation().getArguments()[0];
+        obServer = attachCancelContext(obServer, getCancellationContext());
+        subscribe(obServer);
+        return new AppResponse(asStreamObserver());
+    }
 
-        public ClientStreamObserverImpl(CancellationContext cancellationContext) {
-            super(cancellationContext);
-            this.metaSent = false;
+    private <T> StreamObserver<T> attachCancelContext(StreamObserver<T> observer, CancellationContext context) {
+        if (observer instanceof CancelableStreamObserver) {
+            CancelableStreamObserver<T> streamObserver = (CancelableStreamObserver<T>) observer;
+            streamObserver.setCancellationContext(context);
+            return streamObserver;
         }
+        return observer;
+    }
 
-        @Override
-        public void onNext(Object data) {
-            if (!metaSent) {
-                metaSent = true;
-                final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
-                getTransportSubscriber().onMetadata(metadata, false);
-            }
-            final byte[] bytes = encodeRequest(data);
-            getTransportSubscriber().onData(bytes, false);
-        }
+    private class ClientTransportObserverImpl extends AbstractTransportObserver {
+
+        private boolean error = false;
 
         @Override
-        public void onError(Throwable throwable) {
-            transportError(throwable);
+        public void onData(byte[] data, boolean endStream) {
+            execute(() -> {
+                try {
+                    final Object resp = deserializeResponse(data);
+                    getStreamSubscriber().onNext(resp);
+                } catch (Throwable throwable) {
+                    onError(throwable);
+                }
+            });
         }
 
         @Override
-        public void onCompleted() {
-            getTransportSubscriber().onComplete();
+        public void onComplete() {
+            execute(() -> {
+                getState().setServerEndStreamReceived();
+                final GrpcStatus status = extractStatusFromMeta(getHeaders());
+                if (GrpcStatus.Code.isOk(status.code.code)) {
+                    getStreamSubscriber().onCompleted();
+                } else {
+                    onError(status.cause);
+                }
+            });
         }
 
-        @Override
-        public void setCompression(String compression) {
-            if (metaSent) {
-                cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+        private void onError(Throwable throwable) {
+            if (error) {
                 return;
             }
-            Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
-            setCompressor(compressor);
+            error = true;
+            if (!getState().serverSendStreamReceived()) {
+                cancel(throwable);
+            }
+            getStreamSubscriber().onError(throwable);
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 3715e98..1a53881 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -17,10 +17,11 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
 import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
@@ -28,86 +29,47 @@ import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
 import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
-import io.netty.util.AsciiString;
 
-public class ClientTransportObserver implements TransportObserver {
-    private final AsciiString SCHEME;
-    private final ChannelHandlerContext ctx;
-    private final Http2StreamChannel streamChannel;
-    private final ChannelPromise promise;
-    private boolean headerSent = false;
-    private boolean endStreamSent = false;
-    private boolean resetSent = false;
+public class ClientTransportObserver extends AbstractChannelTransportObserver {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClientTransportObserver.class);
+    private final ChannelPromise promise;
+    private final Http2StreamChannel streamChannel;
 
-    public ClientTransportObserver(ChannelHandlerContext ctx, AbstractClientStream stream, ChannelPromise promise) {
-        this.ctx = ctx;
+    public ClientTransportObserver(Http2StreamChannel channel, ChannelPromise promise) {
+        this.streamChannel = channel;
         this.promise = promise;
-        Boolean ssl = ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).get();
-        if (ssl != null && ssl) {
-            SCHEME = TripleConstant.HTTPS_SCHEME;
-        } else {
-            SCHEME = TripleConstant.HTTP_SCHEME;
-        }
-
-        final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
-        streamChannel = streamChannelBootstrap.open().syncUninterruptibly().getNow();
-
-        final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
-        streamChannel.pipeline().addLast(responseHandler)
-            .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
-            .addLast(new TripleClientInboundHandler());
-        streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
     }
 
     @Override
-    public void onMetadata(Metadata metadata, boolean endStream) {
-        if (headerSent) {
-            return;
-        }
-        if (resetSent) {
-            return;
-        }
-        final Http2Headers headers = new DefaultHttp2Headers(true)
-            .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
-            .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
-            .scheme(SCHEME)
-            .method(HttpMethod.POST.asciiName());
+    protected void doOnMetadata(Metadata metadata, boolean endStream) {
+        final Http2Headers headers = new DefaultHttp2Headers(true);
         metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
-        headerSent = true;
         streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
             .addListener(future -> {
                 if (!future.isSuccess()) {
                     promise.tryFailure(future.cause());
                 }
             });
-
     }
 
     @Override
-    public void onReset(Http2Error http2Error) {
-        resetSent = true;
-        streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+    protected void doOnData(byte[] data, boolean endStream) {
+        ByteBuf buf = streamChannel.alloc().buffer();
+        buf.writeByte(getCompressFlag());
+        buf.writeInt(data.length);
+        buf.writeBytes(data);
+        streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
             .addListener(future -> {
-                if (future.isSuccess()) {
-                    promise.trySuccess();
-                } else {
+                if (!future.isSuccess()) {
                     promise.tryFailure(future.cause());
                 }
             });
     }
 
     @Override
-    public void onData(byte[] data, boolean endStream) {
-        if (resetSent) {
-            return;
-        }
-        ByteBuf buf = ctx.alloc().buffer();
-        buf.writeByte(getCompressFlag());
-        buf.writeInt(data.length);
-        buf.writeBytes(data);
-        streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
+    protected void doOnReset(Http2Error http2Error) {
+        streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
             .addListener(future -> {
                 if (future.isSuccess()) {
                     promise.trySuccess();
@@ -118,14 +80,7 @@ public class ClientTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onComplete() {
-        if (resetSent) {
-            return;
-        }
-        if (endStreamSent) {
-            return;
-        }
-        endStreamSent = true;
+    protected void doOnComplete() {
         streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
             .addListener(future -> {
                 if (future.isSuccess()) {
@@ -138,7 +93,6 @@ public class ClientTransportObserver implements TransportObserver {
 
     private int getCompressFlag() {
         AbstractClientStream stream = streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).get();
-        return TransportObserver.calcCompressFlag(stream.getCompressor());
+        return calcCompressFlag(stream.getCompressor());
     }
-
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
index c6a2791..09c6e6b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -34,6 +34,8 @@ import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 @SPI(value = DEFAULT_COMPRESSOR, scope = ExtensionScope.FRAMEWORK)
 public interface Compressor {
 
+    Compressor NONE = new IdentityCompressor();
+
     String DEFAULT_COMPRESSOR = "identity";
 
     static Compressor getCompressor(FrameworkModel frameworkModel, String compressorStr) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index c21ec32..56a3a21 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.rpc.RpcException;
 
@@ -38,8 +39,8 @@ import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code.UNAVAILABLE;
 
 public class GrpcStatus {
     public final Code code;
-    public final Throwable cause;
-    public final String description;
+    public Throwable cause;
+    public String description;
 
     public GrpcStatus(Code code, Throwable cause, String description) {
         this.code = code;
@@ -55,6 +56,10 @@ public class GrpcStatus {
         return new GrpcStatus(code, null, null);
     }
 
+    public static GrpcStatus fromCodeWithDescription(Code code, String description) {
+        return new GrpcStatus(code, null, description);
+    }
+
     public static byte toDubboStatus(Code code) {
         byte status;
         switch (code) {
@@ -90,7 +95,26 @@ public class GrpcStatus {
         return status;
     }
 
-    public static GrpcStatus rpcExceptionCodeToGrpc(int rpcExceptionCode) {
+    /**
+     * todo The remaining exceptions are converted to status
+     */
+    public static GrpcStatus getStatus(Throwable throwable) {
+        return getStatus(throwable, null);
+    }
+
+    public static GrpcStatus getStatus(Throwable throwable, String description) {
+        if (throwable instanceof RpcException) {
+            RpcException rpcException = (RpcException) throwable;
+            Code code = rpcExceptionCodeToGrpcCode(rpcException.getCode());
+            return new GrpcStatus(code, throwable, description);
+        }
+        if (throwable instanceof TimeoutException) {
+            return new GrpcStatus(GrpcStatus.Code.DEADLINE_EXCEEDED, throwable, description);
+        }
+        return new GrpcStatus(Code.UNKNOWN, throwable, description);
+    }
+
+    public static Code rpcExceptionCodeToGrpcCode(int rpcExceptionCode) {
         Code code;
         switch (rpcExceptionCode) {
             case TIMEOUT_EXCEPTION:
@@ -114,7 +138,7 @@ public class GrpcStatus {
                 code = Code.UNKNOWN;
                 break;
         }
-        return fromCode(code);
+        return code;
     }
 
     public static String limitSizeTo4KB(String desc) {
@@ -133,11 +157,13 @@ public class GrpcStatus {
     }
 
     public GrpcStatus withCause(Throwable cause) {
-        return new GrpcStatus(this.code, cause, this.description);
+        this.cause = cause;
+        return this;
     }
 
     public GrpcStatus withDescription(String description) {
-        return new GrpcStatus(this.code, this.cause, description);
+        this.description = description;
+        return this;
     }
 
     public RpcException asException() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
index a8ac2fc..08d27b7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
@@ -22,7 +22,6 @@ package org.apache.dubbo.rpc.protocol.tri;
  */
 public class IdentityCompressor implements Compressor {
 
-    public static final Compressor NONE = new IdentityCompressor();
 
     @Override
     public String getMessageEncoding() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 7958b61..86a811d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -40,20 +40,26 @@ public class ServerStream extends AbstractServerStream implements Stream {
     }
 
     private class ServerStreamObserverImpl implements ServerStreamObserver<Object> {
-        private boolean headersSent;
 
         @Override
         public void onNext(Object data) {
-            if (!headersSent) {
-                getTransportSubscriber().onMetadata(createRequestMeta(), false);
-                headersSent = true;
+            if (getState().allowSendMeta()) {
+                getTransportSubscriber().onMetadata(createResponseMeta(), false);
             }
             final byte[] bytes = encodeResponse(data);
-            getTransportSubscriber().onData(bytes, false);
+            if (bytes == null) {
+                return;
+            }
+            if (getState().allowSendData()) {
+                getTransportSubscriber().onData(bytes, false);
+            }
         }
 
         @Override
         public void onError(Throwable throwable) {
+            if (!getState().allowSendEndStream()) {
+                return;
+            }
             final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
                 .withCause(throwable)
                 .withDescription("Biz exception");
@@ -62,15 +68,15 @@ public class ServerStream extends AbstractServerStream implements Stream {
 
         @Override
         public void onCompleted() {
-            Metadata metadata = new DefaultMetadata();
-            metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), "OK");
-            metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
-            getTransportSubscriber().onMetadata(metadata, true);
+            if (!getState().allowSendEndStream()) {
+                return;
+            }
+            getTransportSubscriber().onMetadata(TripleConstant.SUCCESS_RESPONSE_META, true);
         }
 
         @Override
         public void setCompression(String compression) {
-            if (headersSent) {
+            if (!getState().allowSendMeta()) {
                 final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
                     .withDescription("Metadata already has been sent,can not set compression");
                 transportError(status);
@@ -110,6 +116,10 @@ public class ServerStream extends AbstractServerStream implements Stream {
                     final RpcInvocation inv = buildInvocation(metadata);
                     inv.setArguments(new Object[]{asStreamObserver()});
                     final Result result = getInvoker().invoke(inv);
+                    if (result.hasException()) {
+                        transportError(GrpcStatus.getStatus(result.getException()));
+                        return;
+                    }
                     try {
                         subscribe((StreamObserver<Object>) result.getValue());
                     } catch (Throwable t) {
@@ -168,7 +178,10 @@ public class ServerStream extends AbstractServerStream implements Stream {
                 final Object[] arguments = deserializeRequest(in);
                 if (arguments != null) {
                     inv.setArguments(new Object[]{arguments[0], asStreamObserver()});
-                    getInvoker().invoke(inv);
+                    final Result result = getInvoker().invoke(inv);
+                    if (result.hasException()) {
+                        transportError(GrpcStatus.getStatus(result.getException()));
+                    }
                 }
             } finally {
                 RpcContext.removeCancellationContext();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index b25fef9..0e09ad4 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -21,42 +21,31 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
 import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
 import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
 import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
 
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-
-public class ServerTransportObserver implements TransportObserver {
+public class ServerTransportObserver extends AbstractChannelTransportObserver {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerTransportObserver.class);
 
     private final ChannelHandlerContext ctx;
-    private boolean headerSent = false;
-    private boolean resetSent = false;
 
     public ServerTransportObserver(ChannelHandlerContext ctx) {
         this.ctx = ctx;
     }
 
-    @Override
-    public void onMetadata(Metadata metadata, boolean endStream) {
-        if (resetSent) {
-            return;
-        }
-        final DefaultHttp2Headers headers = new DefaultHttp2Headers(true);
-        metadata.forEach(e -> {
-            headers.set(e.getKey(), e.getValue());
-        });
-        if (!headerSent) {
-            headerSent = true;
-            headers.status(OK.codeAsText());
-            headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
+    public void onMetadata(Http2Headers headers, boolean endStream) {
+        if (endStream) {
+            state.setEndStreamSend();
+        } else {
+            state.setMetaSend();
         }
-        // If endStream is true, the channel will be closed, so you cannot listen for errors and continue sending any frame
         ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
             .addListener(future -> {
                 if (!future.isSuccess()) {
@@ -66,8 +55,33 @@ public class ServerTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onReset(Http2Error http2Error) {
-        resetSent = true;
+    public void onMetadata(Metadata metadata, boolean endStream) {
+        doOnMetadata(metadata, endStream);
+    }
+
+    @Override
+    public void onData(byte[] data, boolean endStream) {
+        doOnData(data, endStream);
+    }
+
+    @Override
+    protected void doOnMetadata(Metadata metadata, boolean endStream) {
+        final DefaultHttp2Headers headers = new DefaultHttp2Headers(true);
+        metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
+        onMetadata(headers, endStream);
+    }
+
+    @Override
+    protected void doOnData(byte[] data, boolean endStream) {
+        ByteBuf buf = ctx.alloc().buffer();
+        buf.writeByte(getCompressFlag());
+        buf.writeInt(data.length);
+        buf.writeBytes(data);
+        onData(buf, endStream);
+    }
+
+    @Override
+    protected void doOnReset(Http2Error http2Error) {
         ctx.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
             .addListener(future -> {
                 if (!future.isSuccess()) {
@@ -77,15 +91,20 @@ public class ServerTransportObserver implements TransportObserver {
     }
 
     @Override
-    public void onData(byte[] data, boolean endStream) {
-        if (resetSent) {
-            return;
+    protected void doOnComplete() {
+
+    }
+
+    public void onData(String str, boolean endStream) {
+        ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), str);
+        onData(buf, endStream);
+    }
+
+    public void onData(ByteBuf buf, boolean endStream) {
+        if (endStream) {
+            state.setEndStreamSend();
         }
-        ByteBuf buf = ctx.alloc().buffer();
-        buf.writeByte(getCompressFlag());
-        buf.writeInt(data.length);
-        buf.writeBytes(data);
-        ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
+        ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
             .addListener(future -> {
                 if (!future.isSuccess()) {
                     LOGGER.warn("send data error endStream=" + endStream, future.cause());
@@ -93,9 +112,8 @@ public class ServerTransportObserver implements TransportObserver {
             });
     }
 
-
     private int getCompressFlag() {
         AbstractServerStream stream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
-        return TransportObserver.calcCompressFlag(stream.getCompressor());
+        return calcCompressFlag(stream.getCompressor());
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
index de83eab..358cc33 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
@@ -23,7 +23,7 @@ import org.apache.dubbo.common.stream.StreamObserver;
 /**
  * Stream acts as a bi-directional intermediate layer for streaming data processing. It serializes object instance to
  * byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #asTransportObserver()}
- * and {@link #subscribe(TransportObserver)} provide {@link TransportObserver} to send or receive remote data.
+ * and {@link #subscribe(AbstractChannelTransportObserver)} provide {@link TransportObserver} to send or receive remote data.
  * {@link #asStreamObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
  * as API for users fetching/emitting objects from/to remote peer.
  */
@@ -36,7 +36,7 @@ public interface Stream {
      *
      * @param observer receives remote byte[] data
      */
-    void subscribe(TransportObserver observer);
+    void subscribe(AbstractChannelTransportObserver observer);
 
     /**
      * Get a downstream data observer for writing byte[] data to this stream
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 015a87e..7e5bc8f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -21,21 +21,12 @@ import io.netty.handler.codec.http2.Http2Error;
 
 public interface TransportObserver {
 
-    static int calcCompressFlag(Compressor compressor) {
-        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
-            return 0;
-        }
-        return 1;
-    }
-
     void onMetadata(Metadata metadata, boolean endStream);
 
     void onData(byte[] data, boolean endStream);
 
-    default void onReset(Http2Error http2Error) {
-    }
+    void onReset(Http2Error http2Error);
 
-    default void onComplete() {
-    }
+    void onComplete();
 
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
new file mode 100644
index 0000000..4160fd6
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+public class TransportState {
+
+    private volatile int state = 0;
+    private static final int META_SEND = 0b00000000000000000000000000000001;
+    private static final int RESET_SEND = 0b00000000000000000000000000000010;
+    private static final int END_STREAM_SEND = 0b00000000000000000000000000000100;
+    private static final int SERVER_SEND_STREAM_RECEIVED = 0b00000000000000000000000000001000;
+
+    private static final int ALLOW_META_SEND = 0b00000000000000000000000000000000;
+    private static final int ALLOW_DATA_SEND = META_SEND;
+    private static final int ALLOW_END_STREAM_SEND = META_SEND;
+    private static final int ALLOW_RESET_SEND = 0b00000000000000000000000000000001;
+
+    public TransportState() {
+    }
+
+    public void setState(int state) {
+        this.state = state;
+    }
+
+    public void setMetaSend() {
+        this.state = this.state | META_SEND;
+    }
+
+    public void setResetSend() {
+        this.state = this.state | RESET_SEND;
+    }
+
+    public void setEndStreamSend() {
+        this.state = this.state | END_STREAM_SEND;
+    }
+
+    public void setServerEndStreamReceived() {
+        this.state = this.state | SERVER_SEND_STREAM_RECEIVED;
+    }
+
+    public boolean serverSendStreamReceived() {
+        return (this.state & SERVER_SEND_STREAM_RECEIVED) > 0;
+    }
+
+    public boolean allowSendMeta() {
+        return this.state == ALLOW_META_SEND;
+    }
+
+    public boolean allowSendReset() {
+        return (this.state & RESET_SEND) != RESET_SEND;
+    }
+
+    public boolean allowSendData() {
+        return this.state == ALLOW_DATA_SEND;
+    }
+
+    public boolean allowSendEndStream() {
+        return this.state == ALLOW_END_STREAM_SEND;
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 83928d2..f215c27 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -16,33 +16,16 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.api.Connection;
 import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
-import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
 
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
-import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
 
 public class TripleClientRequestHandler extends ChannelDuplexHandler {
 
@@ -63,85 +46,19 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
 
     private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
         DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
-        final RpcInvocation inv = (RpcInvocation) req.getData();
-        final URL url = inv.getInvoker().getUrl();
-        ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
-
-        MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel, inv);
-
-        ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
-        final AbstractClientStream stream = AbstractClientStream.newClientStream(url, methodDescriptor.isUnary());
-
-        String ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY);
-        if (StringUtils.isNotEmpty(ssl)) {
-            ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
-        }
-        // Compressor can not be set by dynamic config
-        String compressorStr = ConfigurationUtils
-            .getCachedDynamicProperty(inv.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
-
-        Compressor compressor = Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
-        if (compressor != null) {
-            stream.setCompressor(compressor);
-        }
-
-        stream.service(consumerModel)
-            .connection(Connection.getConnectionFromChannel(ctx.channel()))
-            .method(methodDescriptor)
-            .methodName(methodDescriptor.getMethodName())
-            .request(req)
-            .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
-            .subscribe(new ClientTransportObserver(ctx, stream, promise));
-
-        if (methodDescriptor.isUnary()) {
-            stream.asStreamObserver().onNext(inv);
-            stream.asStreamObserver().onCompleted();
-        } else {
-            Response response = new Response(req.getId(), req.getVersion());
-            AppResponse result;
-            // the stream method params is fixed
-            if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
-                || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
-                StreamObserver<Object> obServer = (StreamObserver<Object>) inv.getArguments()[0];
-                obServer = attachCancelContext(obServer, stream.getCancellationContext());
-                stream.subscribe(obServer);
-                result = new AppResponse(stream.asStreamObserver());
-            } else {
-                StreamObserver<Object> obServer = (StreamObserver<Object>) inv.getArguments()[1];
-                obServer = attachCancelContext(obServer, stream.getCancellationContext());
-                stream.subscribe(obServer);
-                result = new AppResponse();
-                stream.asStreamObserver().onNext(inv.getArguments()[0]);
-                stream.asStreamObserver().onCompleted();
-            }
-            response.setResult(result);
-            DefaultFuture2.received(stream.getConnection(), response);
-        }
-    }
-
-    /**
-     * Get the tri protocol special MethodDescriptor
-     */
-    private MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
-        List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
-        if (CollectionUtils.isEmpty(methodDescriptors)) {
-            throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
-        }
-        for (MethodDescriptor methodDescriptor : methodDescriptors) {
-            if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
-                return methodDescriptor;
-            }
-        }
-        throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
-    }
-
-
-    public <T> StreamObserver<T> attachCancelContext(StreamObserver<T> observer, CancellationContext context) {
-        if (observer instanceof CancelableStreamObserver) {
-            CancelableStreamObserver<T> streamObserver = ((CancelableStreamObserver<T>) observer);
-            streamObserver.setCancellationContext(context);
-            return streamObserver;
-        }
-        return observer;
+        Connection connection = Connection.getConnectionFromChannel(ctx.channel());
+        final AbstractClientStream stream = AbstractClientStream.newClientStream(req, connection);
+        final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
+        streamChannelBootstrap.open()
+            .addListener(future -> {
+                if (future.isSuccess()) {
+                    final Http2StreamChannel curChannel = (Http2StreamChannel) future.get();
+                    // Start call only when the channel creation is successful
+                    stream.startCall(curChannel, promise);
+                } else {
+                    promise.tryFailure(future.cause());
+                    DefaultFuture2.getFuture(req.getId()).cancel();
+                }
+            });
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 88c534f..91ace1c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -21,24 +21,40 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import io.netty.util.AsciiString;
 import io.netty.util.AttributeKey;
 
-public interface TripleConstant {
-    String CONTENT_PROTO = "application/grpc+proto";
-    String APPLICATION_GRPC = "application/grpc";
-    String TRI_VERSION = "1.0.0";
+public class TripleConstant {
 
-    String SERIALIZATION_KEY = "serialization";
-    String TE_KEY = "te";
+    public static final String CONTENT_PROTO = "application/grpc+proto";
+    public static final String APPLICATION_GRPC = "application/grpc";
+    public static final String TEXT_PLAIN_UTF8 = "text/plain; encoding=utf-8";
+    public static final String TRI_VERSION = "1.0.0";
 
-    AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
+    public static final String SERIALIZATION_KEY = "serialization";
+    public static final String TE_KEY = "te";
 
 
-    AsciiString HTTPS_SCHEME = AsciiString.of("https");
-    AsciiString HTTP_SCHEME = AsciiString.of("http");
+    public static final String HESSIAN4 = "hessian4";
+    public static final String HESSIAN2 = "hessian2";
 
 
-    AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.newInstance(
-        "tri_server_stream");
-    AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
-        "tri_client_stream");
+    public static final String GRPC_BIN_SUFFIX = "-bin";
+
+    public static final AsciiString HTTPS_SCHEME = AsciiString.of("https");
+    public static final AsciiString HTTP_SCHEME = AsciiString.of("http");
+
+    public static final AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
+    public static final AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.valueOf("tri_server_stream");
+    public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.valueOf("tri_client_stream");
+
+    public static final String SUCCESS_RESPONSE_MESSAGE = "OK";
+    public static final String SUCCESS_RESPONSE_STATUS = Integer.toString(GrpcStatus.Code.OK.code);
+
+    public static final Metadata SUCCESS_RESPONSE_META = getSuccessResponseMeta();
+
+    private static Metadata getSuccessResponseMeta() {
+        Metadata metadata = new DefaultMetadata();
+        metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_MESSAGE);
+        metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_STATUS);
+        return metadata;
+    }
 
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index c3e783a..46207bb 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
@@ -37,6 +36,7 @@ import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
 import io.netty.handler.codec.http2.Http2DataFrame;
 import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Frame;
@@ -47,6 +47,7 @@ import io.netty.util.ReferenceCountUtil;
 
 import java.util.List;
 
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 
 public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
@@ -94,12 +95,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         if (LOGGER.isWarnEnabled()) {
             LOGGER.warn("Exception in processing triple message", cause);
         }
-        if (cause instanceof RpcException) {
-            TripleUtil.responseErr(ctx, GrpcStatus.rpcExceptionCodeToGrpc(((RpcException) cause).getCode()));
-        } else {
-            TripleUtil.responseErr(ctx, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                .withDescription("Provider's error:\n" + cause.getMessage()));
-        }
+        GrpcStatus status = GrpcStatus.getStatus(cause, "Provider's error:\n" + cause.getMessage());
+        final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+        serverStream.transportError(status, null, true);
     }
 
     public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception {
@@ -128,23 +126,24 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
     public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
         final Http2Headers headers = msg.headers();
+        ServerTransportObserver transportObserver = new ServerTransportObserver(ctx);
 
         if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
-            TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
+            responsePlainTextError(transportObserver, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
                 GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
                     .withDescription(String.format("Method '%s' is not supported", headers.method())));
             return;
         }
 
         if (headers.path() == null) {
-            TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+            responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
                 GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
             return;
         }
 
         final String path = headers.path().toString();
         if (path.charAt(0) != '/') {
-            TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+            responsePlainTextError(transportObserver, HttpResponseStatus.NOT_FOUND.code(),
                 GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
                     .withDescription(String.format("Expected path to start with /: %s", path)));
             return;
@@ -152,15 +151,15 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
         final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
         if (contentType == null) {
-            TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+            responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
                 GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
                     .withDescription("Content-Type is missing from the request"));
             return;
         }
 
         final String contentString = contentType.toString();
-        if (!TripleUtil.supportContentType(contentString)) {
-            TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+        if (!supportContentType(contentString)) {
+            responsePlainTextError(transportObserver, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
                 GrpcStatus.fromCode(Code.INTERNAL.code)
                     .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
             return;
@@ -168,7 +167,8 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
         String[] parts = path.split("/");
         if (parts.length != 3) {
-            TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Bad path format:" + path));
+            responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+                .withDescription("Bad path format:" + path));
             return;
         }
         String serviceName = parts[1];
@@ -177,15 +177,15 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
         final Invoker<?> invoker = getInvoker(headers, serviceName);
         if (invoker == null) {
-            TripleUtil.responseErr(ctx,
-                GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+            responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+                .withDescription("Service not found:" + serviceName));
             return;
         }
         FrameworkServiceRepository repo = frameworkModel.getServiceRepository();
         ProviderModel providerModel = repo.lookupExportedService(invoker.getUrl().getServiceKey());
         if (providerModel == null || providerModel.getServiceModel() == null) {
-            TripleUtil.responseErr(ctx,
-                GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+            responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+                .withDescription("Service not found:" + serviceName));
             return;
         }
 
@@ -201,7 +201,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         } else {
             methodDescriptors = providerModel.getServiceModel().getMethods(methodName);
             if (CollectionUtils.isEmpty(methodDescriptors)) {
-                TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+                responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
                     .withDescription("Method :" + methodName + " not found of service:" + serviceName));
                 return;
             }
@@ -210,6 +210,22 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
                 methodDescriptor = methodDescriptors.get(0);
             }
         }
+
+        Compressor deCompressor = Compressor.NONE;
+        CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
+        if (null != messageEncoding) {
+            String compressorStr = messageEncoding.toString();
+            if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
+                Compressor compressor = Compressor.getCompressor(frameworkModel, compressorStr);
+                if (null == compressor) {
+                    responseErr(transportObserver, GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+                        .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
+                    return;
+                }
+                deCompressor = compressor;
+            }
+        }
+
         boolean isUnary = methodDescriptor != null && methodDescriptor.isUnary();
         final AbstractServerStream stream = AbstractServerStream.newServerStream(invoker.getUrl(), isUnary);
 
@@ -218,27 +234,15 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         stream.service(providerModel.getServiceModel())
             .invoker(invoker)
             .methodName(methodName)
-            .subscribe(new ServerTransportObserver(ctx));
+            .setDeCompressor(deCompressor)
+            .subscribe(transportObserver);
         if (methodDescriptor != null) {
             stream.method(methodDescriptor);
         } else {
             // Then you need to find the corresponding parameter according to the request body
             stream.methods(methodDescriptors);
         }
-        CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
-        if (null != messageEncoding) {
-            String compressorStr = messageEncoding.toString();
-            if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
-                Compressor compressor = Compressor.getCompressor(frameworkModel, compressorStr);
-                if (null == compressor) {
-                    TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
-                        GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
-                            .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
-                } else {
-                    stream.setDeCompressor(compressor);
-                }
-            }
-        }
+
         final TransportObserver observer = stream.asTransportObserver();
         observer.onMetadata(new Http2HeaderMeta(headers), false);
         if (msg.isEndStream()) {
@@ -247,6 +251,34 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         channel.attr(TripleConstant.SERVER_STREAM_KEY).set(stream);
     }
 
+    /**
+     * must starts from application/grpc
+     */
+    private boolean supportContentType(String contentType) {
+        if (contentType == null) {
+            return false;
+        }
+        return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
+    }
+
+    private void responsePlainTextError(ServerTransportObserver observer, int code, GrpcStatus status) {
+        Http2Headers headers = new DefaultHttp2Headers(true)
+            .status(String.valueOf(code))
+            .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
+            .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
+            .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.TEXT_PLAIN_UTF8);
+        observer.onMetadata(headers, false);
+        observer.onData(status.description, true);
+    }
+
+    private void responseErr(ServerTransportObserver observer, GrpcStatus status) {
+        Http2Headers trailers = new DefaultHttp2Headers()
+            .status(OK.codeAsText())
+            .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
+            .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
+            .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
+        observer.onMetadata(trailers, true);
+    }
 
     private boolean isEcho(String methodName) {
         return CommonConstants.$ECHO.equals(methodName);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index f1f268c..2fe6687 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -28,14 +28,6 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.rpc.DebugInfo;
 import com.google.rpc.ErrorInfo;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
-import io.netty.handler.codec.http2.Http2Headers;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -51,8 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-
 public class TripleUtil {
     // Some exceptions are not very useful and add too much noise to the log
     private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
@@ -73,36 +63,6 @@ public class TripleUtil {
         return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
     }
 
-    /**
-     * must starts from application/grpc
-     */
-    public static boolean supportContentType(String contentType) {
-        if (contentType == null) {
-            return false;
-        }
-        return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
-    }
-
-    public static void responseErr(ChannelHandlerContext ctx, GrpcStatus status) {
-        Http2Headers trailers = new DefaultHttp2Headers()
-            .status(OK.codeAsText())
-            .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
-            .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
-            .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
-        ctx.writeAndFlush(new DefaultHttp2HeadersFrame(trailers, true));
-    }
-
-    public static void responsePlainTextError(ChannelHandlerContext ctx, int code, GrpcStatus status) {
-        Http2Headers headers = new DefaultHttp2Headers(true)
-            .status("" + code)
-            .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
-            .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
-            .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), "text/plain; encoding=utf-8");
-        ctx.write(new DefaultHttp2HeadersFrame(headers));
-        ByteBuf buf = ByteBufUtil.writeUtf8(ctx.alloc(), status.description);
-        ctx.write(new DefaultHttp2DataFrame(buf, true));
-    }
-
     public static Object unwrapResp(URL url, TripleWrapper.TripleResponseWrapper wrap,
                                     MultipleSerialization serialization) {
         String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
@@ -276,15 +236,15 @@ public class TripleUtil {
     }
 
     public static String convertHessianToWrapper(String serializeType) {
-        if ("hessian2".equals(serializeType)) {
-            return "hessian4";
+        if (TripleConstant.HESSIAN2.equals(serializeType)) {
+            return TripleConstant.HESSIAN4;
         }
         return serializeType;
     }
 
     public static String convertHessianFromWrapper(String serializeType) {
-        if ("hessian4".equals(serializeType)) {
-            return "hessian2";
+        if (TripleConstant.HESSIAN4.equals(serializeType)) {
+            return TripleConstant.HESSIAN2;
         }
         return serializeType;
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index e90f29e..dc98859 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -18,12 +18,10 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
 
 import com.google.protobuf.Any;
 import com.google.rpc.DebugInfo;
@@ -34,14 +32,14 @@ import java.util.Map;
 
 public class UnaryClientStream extends AbstractClientStream implements Stream {
 
-
     protected UnaryClientStream(URL url) {
         super(url);
     }
 
     @Override
-    protected StreamObserver<Object> createStreamObserver() {
-        return new UnaryClientStreamObserverImpl();
+    protected void doOnStartCall() {
+        asStreamObserver().onNext(getRpcInvocation());
+        asStreamObserver().onCompleted();
     }
 
     @Override
@@ -53,45 +51,40 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
 
         @Override
         public void doOnComplete() {
-            execute(() -> {
-                try {
-                    AppResponse result;
-                    if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
-                        final Object resp = deserializeResponse(getData());
-                        result = new AppResponse(resp);
-                    } else {
-                        result = new AppResponse();
-                    }
-                    Response response = new Response(getRequest().getId(), TripleConstant.TRI_VERSION);
-                    result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
-                    response.setResult(result);
-                    DefaultFuture2.received(getConnection(), response);
-                } catch (Exception e) {
-                    final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                        .withCause(e)
-                        .withDescription("Failed to deserialize response");
-                    onError(status);
+            try {
+                AppResponse result;
+                if (!Void.TYPE.equals(getMethodDescriptor().getReturnClass())) {
+                    final Object resp = deserializeResponse(getData());
+                    result = new AppResponse(resp);
+                } else {
+                    result = new AppResponse();
                 }
-            });
+                Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+                result.setObjectAttachments(parseMetadataToAttachmentMap(getTrailers()));
+                response.setResult(result);
+                DefaultFuture2.received(getConnection(), response);
+            } catch (Exception e) {
+                final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                    .withCause(e)
+                    .withDescription("Failed to deserialize response");
+                onError(status);
+            }
         }
 
         @Override
         protected void onError(GrpcStatus status) {
-            // run in callback executor will truncate exception stack and avoid blocking netty's event loop
-            execute(() -> {
-                Response response = new Response(getRequest().getId(), TripleConstant.TRI_VERSION);
-                response.setErrorMessage(status.description);
-                final AppResponse result = new AppResponse();
-                final Metadata trailers = getTrailers() == null ? getHeaders() : getTrailers();
-                result.setException(getThrowable(trailers));
-                result.setObjectAttachments(UnaryClientStream.this.parseMetadataToAttachmentMap(trailers));
-                response.setResult(result);
-                if (!result.hasException()) {
-                    final byte code = GrpcStatus.toDubboStatus(status.code);
-                    response.setStatus(code);
-                }
-                DefaultFuture2.received(getConnection(), response);
-            });
+            Response response = new Response(getRequestId(), TripleConstant.TRI_VERSION);
+            response.setErrorMessage(status.description);
+            final AppResponse result = new AppResponse();
+            final Metadata trailers = getTrailers() == null ? getHeaders() : getTrailers();
+            result.setException(getThrowable(trailers));
+            result.setObjectAttachments(UnaryClientStream.this.parseMetadataToAttachmentMap(trailers));
+            response.setResult(result);
+            if (!result.hasException()) {
+                final byte code = GrpcStatus.toDubboStatus(status.code);
+                response.setStatus(code);
+            }
+            DefaultFuture2.received(getConnection(), response);
         }
 
         private Throwable getThrowable(Metadata metadata) {
@@ -123,26 +116,4 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
             }
         }
     }
-
-
-    private class UnaryClientStreamObserverImpl implements StreamObserver<Object> {
-
-        @Override
-        public void onNext(Object data) {
-            RpcInvocation invocation = (RpcInvocation) data;
-            final Metadata metadata = createRequestMeta(invocation);
-            getTransportSubscriber().onMetadata(metadata, false);
-            final byte[] bytes = encodeRequest(invocation);
-            getTransportSubscriber().onData(bytes, false);
-        }
-
-        @Override
-        public void onError(Throwable throwable) {
-        }
-
-        @Override
-        public void onCompleted() {
-            getTransportSubscriber().onComplete();
-        }
-    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index fde24a0..8c3d98b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -19,21 +19,15 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 
-import io.netty.handler.codec.http.HttpHeaderNames;
-
-import java.util.Map;
 import java.util.concurrent.CompletionStage;
-import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.rpcExceptionCodeToGrpc;
+import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.getStatus;
 
 public class UnaryServerStream extends AbstractServerStream implements Stream {
 
@@ -60,7 +54,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
         @Override
         public void doOnComplete() {
             if (getData() != null) {
-                execute(this::invoke);
+                invoke();
             } else {
                 onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
                     .withDescription("Missing request data"));
@@ -68,90 +62,37 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
         }
 
         public void invoke() {
-
-            RpcInvocation invocation;
-            try {
-                invocation = buildInvocation(getHeaders());
-                final Object[] arguments = deserializeRequest(getData());
-                if (arguments != null) {
-                    invocation.setArguments(arguments);
-                } else {
-                    return;
-                }
-            } catch (Throwable t) {
-                LOGGER.warn("Exception processing triple message", t);
-                transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withDescription("Decode request failed:" + t.getMessage()));
+            RpcInvocation invocation = buildInvocation(getHeaders());
+            final Object[] arguments = deserializeRequest(getData());
+            if (arguments == null) {
                 return;
             }
-
+            invocation.setArguments(arguments);
             final Result result = getInvoker().invoke(invocation);
             CompletionStage<Object> future = result.thenApply(Function.identity());
-
-            BiConsumer<Object, Throwable> onComplete = (appResult, t) -> {
-                if (t != null) {
-                    if (t instanceof TimeoutException) {
-                        transportError(GrpcStatus.fromCode(GrpcStatus.Code.DEADLINE_EXCEEDED).withCause(t));
-                    } else {
-                        transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN).withCause(t));
-                    }
+            future.whenComplete((o, throwable) -> {
+                if (throwable != null) {
+                    LOGGER.error("Invoke error", throwable);
+                    transportError(getStatus(throwable));
                     return;
                 }
-                AppResponse response = (AppResponse) appResult;
-                try {
-                    if (response.hasException()) {
-                        final Throwable exception = response.getException();
-                        if (exception instanceof RpcException) {
-                            transportError(rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
-                                .withCause(exception), response.getObjectAttachments());
-                            final GrpcStatus status = rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
-                                .withCause(exception);
-                            transportError(status, response.getObjectAttachments());
-                        } else {
-                            transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
-                                .withCause(exception), response.getObjectAttachments());
-                        }
-                        return;
-                    }
-                    Metadata metadata = createRequestMeta();
-                    metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
-                    getTransportSubscriber().onMetadata(metadata, false);
-
-                    final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
-                    final byte[] data;
-                    try {
-                        ClassLoadUtil.switchContextLoader(
-                            getProviderModel().getServiceInterfaceClass().getClassLoader());
-                        data = encodeResponse(response.getValue());
-                    } finally {
-                        ClassLoadUtil.switchContextLoader(tccl);
-                    }
-                    getTransportSubscriber().onData(data, false);
-
-                    Metadata trailers = new DefaultMetadata()
-                        .put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
-                    final Map<String, Object> attachments = response.getObjectAttachments();
-                    if (attachments != null) {
-                        convertAttachment(trailers, attachments);
-                    }
-                    getTransportSubscriber().onMetadata(trailers, true);
-                } catch (Throwable e) {
-                    LOGGER.warn("Exception processing triple message", e);
-                    if (e instanceof RpcException) {
-                        final GrpcStatus status = rpcExceptionCodeToGrpc(((RpcException) e).getCode())
-                            .withCause(e);
-                        transportError(status, response.getObjectAttachments());
-                    } else {
-                        transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
-                            .withDescription("Exception occurred in provider's execution:" + e.getMessage())
-                            .withCause(e), response.getObjectAttachments());
-                    }
+                AppResponse response = (AppResponse) o;
+                if (response.hasException()) {
+                    transportError(getStatus(response.getException()));
+                    return;
                 }
-            };
-
-            future.whenComplete(onComplete);
+                Metadata metadata = createResponseMeta();
+                getTransportSubscriber().onMetadata(metadata, false);
+                final byte[] data = encodeResponse(response.getValue());
+                if (data == null) {
+                    return;
+                }
+                getTransportSubscriber().onData(data, false);
+                Metadata trailers = TripleConstant.SUCCESS_RESPONSE_META;
+                convertAttachment(trailers, response.getObjectAttachments());
+                getTransportSubscriber().onMetadata(trailers, true);
+            });
             RpcContext.removeContext();
         }
     }
-
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
new file mode 100644
index 0000000..ce6d7be
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TransportStateTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TransportStateTest {
+
+    @Test
+    void allowSendMeta() {
+        TransportState transportState = new TransportState();
+        transportState.setMetaSend();
+        Assertions.assertFalse(transportState.allowSendMeta());
+
+        transportState = new TransportState();
+        transportState.setEndStreamSend();
+        Assertions.assertFalse(transportState.allowSendMeta());
+
+        transportState = new TransportState();
+        transportState.setResetSend();
+        Assertions.assertFalse(transportState.allowSendMeta());
+
+        transportState = new TransportState();
+        transportState.setEndStreamSend();
+        Assertions.assertFalse(transportState.allowSendMeta());
+
+        transportState = new TransportState();
+        transportState.setEndStreamSend();
+        transportState.setMetaSend();
+        Assertions.assertFalse(transportState.allowSendMeta());
+
+        transportState = new TransportState();
+        Assertions.assertTrue(transportState.allowSendMeta());
+    }
+
+    @Test
+    void allowSendData() {
+        TransportState transportState = new TransportState();
+        Assertions.assertFalse(transportState.allowSendData());
+
+        transportState = new TransportState();
+        transportState.setResetSend();
+        Assertions.assertFalse(transportState.allowSendData());
+
+        transportState = new TransportState();
+        transportState.setEndStreamSend();
+        Assertions.assertFalse(transportState.allowSendData());
+
+        transportState = new TransportState();
+        transportState.setMetaSend();
+        Assertions.assertTrue(transportState.allowSendData());
+    }
+
+    @Test
+    void allowSendEndStream() {
+        TransportState transportState = new TransportState();
+        Assertions.assertFalse(transportState.allowSendEndStream());
+
+        transportState = new TransportState();
+        transportState.setResetSend();
+        Assertions.assertFalse(transportState.allowSendEndStream());
+
+        transportState = new TransportState();
+        transportState.setEndStreamSend();
+        Assertions.assertFalse(transportState.allowSendEndStream());
+
+        transportState = new TransportState();
+        transportState.setMetaSend();
+        Assertions.assertTrue(transportState.allowSendEndStream());
+
+    }
+
+    @Test
+    void allowSendReset() {
+        TransportState transportState = new TransportState();
+        transportState.setResetSend();
+        Assertions.assertFalse(transportState.allowSendReset());
+
+        transportState = new TransportState();
+        Assertions.assertTrue(transportState.allowSendReset());
+
+        transportState = new TransportState();
+        transportState.setEndStreamSend();
+        Assertions.assertTrue(transportState.allowSendReset());
+
+        transportState = new TransportState();
+        transportState.setMetaSend();
+        Assertions.assertTrue(transportState.allowSendReset());
+    }
+
+    @Test
+    void serverEndStream() {
+        TransportState transportState = new TransportState();
+        Assertions.assertFalse(transportState.serverSendStreamReceived());
+
+        transportState.setServerEndStreamReceived();
+        Assertions.assertTrue(transportState.serverSendStreamReceived());
+
+    }
+}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 5910b3b..0cddc3c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -28,8 +28,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.when;
 
 class UnaryClientStreamTest {
@@ -46,11 +44,10 @@ class UnaryClientStreamTest {
         // no subscriber
         Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
 
-        TransportObserver transportObserver = Mockito.mock(TransportObserver.class);
+        AbstractChannelTransportObserver transportObserver = Mockito.mock(AbstractChannelTransportObserver.class);
         stream.subscribe(transportObserver);
         // no method descriptor
         Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
-        Mockito.verify(transportObserver).onMetadata(any(), anyBoolean());
 
         MethodDescriptor md = Mockito.mock(MethodDescriptor.class);
         when(md.isNeedWrap()).thenReturn(true);