You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/18 02:38:50 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] Support streamObserver set compressor alone (#9032)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new d548c9f  [3.0-Triple] Support streamObserver set compressor alone (#9032)
d548c9f is described below

commit d548c9feb28e9ba4d0cc40c70709dc24e952794b
Author: earthchen <yo...@duobei.com>
AuthorDate: Mon Oct 18 10:38:31 2021 +0800

    [3.0-Triple] Support streamObserver set compressor alone (#9032)
    
    * support streamObserver set compressor
    
    * fix rat
    
    * fix comment
    
    * remove CancelableStreamObserver getCancelContext
    
    * add final for CancelableStreamObserver setCancelContext
    
    * remove compressor from channel
    
    * server set compress alone
    
    * Separate Compressor and decompressor
    
    * change client channel stream
    
    * fix review
    
    * fix error
    
    * remove util
    
    * fix error
    
    * fix style
---
 .../rpc/protocol/tri/AbstractClientStream.java     | 32 ++-------
 .../rpc/protocol/tri/AbstractServerStream.java     |  2 +-
 .../dubbo/rpc/protocol/tri/AbstractStream.java     | 66 ++++++++++++++-----
 .../rpc/protocol/tri/CancelableStreamObserver.java | 25 +++++--
 .../dubbo/rpc/protocol/tri/ClientStream.java       | 56 +++++++++++-----
 ...portObserver.java => ClientStreamObserver.java} | 20 +++---
 .../rpc/protocol/tri/ClientTransportObserver.java  | 74 +++++++++++----------
 .../apache/dubbo/rpc/protocol/tri/Compressor.java  | 22 +++++++
 .../dubbo/rpc/protocol/tri/GrpcDataDecoder.java    | 46 ++++++++++---
 .../dubbo/rpc/protocol/tri/ServerStream.java       | 26 ++++++--
 ...portObserver.java => ServerStreamObserver.java} | 20 +++---
 .../rpc/protocol/tri/ServerTransportObserver.java  | 38 ++++++-----
 .../dubbo/rpc/protocol/tri/TransportObserver.java  |  8 +++
 .../protocol/tri/TripleClientInboundHandler.java   |  4 +-
 .../protocol/tri/TripleClientRequestHandler.java   |  8 +--
 .../dubbo/rpc/protocol/tri/TripleConstant.java     |  9 ++-
 .../tri/TripleHttp2ClientResponseHandler.java      | 19 +++---
 .../tri/TripleHttp2FrameServerHandler.java         | 15 ++---
 .../protocol/tri/TripleServerInboundHandler.java   |  2 +-
 .../rpc/protocol/tri/TripleServerInitializer.java  |  4 +-
 .../apache/dubbo/rpc/protocol/tri/TripleUtil.java  | 76 ----------------------
 .../dubbo/rpc/protocol/tri/UnaryClientStream.java  | 25 ++++++-
 22 files changed, 338 insertions(+), 259 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 4c6d4cb..b794edb 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -18,12 +18,10 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.remoting.api.Connection;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
 import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.Constants;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.triple.TripleWrapper;
@@ -36,7 +34,6 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 
-import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 
 public abstract class AbstractClientStream extends AbstractStream implements Stream {
     private ConsumerModel consumerModel;
@@ -63,6 +60,10 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         final CancellationContext cancellationContext = stream.getCancellationContext();
         // for client cancel,send rst frame to server
         cancellationContext.addListener(context -> {
+            if (LOGGER.isWarnEnabled()) {
+                Throwable throwable = cancellationContext.getCancellationCause();
+                LOGGER.warn("Cancel by local throwable is ", throwable);
+            }
             stream.asTransportObserver().onReset(Http2Error.CANCEL);
         });
         return stream;
@@ -173,10 +174,8 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
             .putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
                 (String) inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
             .putIfNotNull(TripleHeaderEnum.SERVICE_GROUP.getHeader(), getUrl().getGroup())
-            .putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(),
-                ConfigurationUtils.getCachedDynamicProperty(inv.getModuleModel(), Constants.COMPRESSOR_KEY, DEFAULT_COMPRESSOR))
-            .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(),
-                TripleUtil.calcAcceptEncoding(getUrl()));
+            .putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), getCompressor().getMessageEncoding())
+            .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
         final Map<String, Object> attachments = inv.getObjectAttachments();
         if (attachments != null) {
             convertAttachment(metadata, attachments);
@@ -194,24 +193,5 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         getCancellationContext().cancel(throwable);
     }
 
-    protected class ClientStreamObserver extends CancelableStreamObserver<Object> {
-
-        @Override
-        public void onNext(Object data) {
-            RpcInvocation invocation = (RpcInvocation) data;
-            final Metadata metadata = createRequestMeta(invocation);
-            getTransportSubscriber().onMetadata(metadata, false);
-            final byte[] bytes = encodeRequest(invocation);
-            getTransportSubscriber().onData(bytes, false);
-        }
 
-        @Override
-        public void onError(Throwable throwable) {
-        }
-
-        @Override
-        public void onCompleted() {
-            getTransportSubscriber().onComplete();
-        }
-    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 7293701..153c556 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -181,7 +181,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
     protected Metadata createRequestMeta() {
         Metadata metadata = new DefaultMetadata();
         metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), super.getCompressor().getMessageEncoding())
-                .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), TripleUtil.calcAcceptEncoding(invoker.getUrl()));
+                .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
         return metadata;
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 60a6f97..c6bdde6 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -59,6 +59,7 @@ public abstract class AbstractStream implements Stream {
     private StreamObserver<Object> streamSubscriber;
     private TransportObserver transportSubscriber;
     private Compressor compressor = IdentityCompressor.NONE;
+    private Compressor deCompressor = IdentityCompressor.NONE;
 
     private final CancellationContext cancellationContext;
     private volatile boolean cancelled = false;
@@ -81,7 +82,7 @@ public abstract class AbstractStream implements Stream {
         this.executor = wrapperSerializingExecutor(sourceExecutor);
         final String value = url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY);
         this.multipleSerialization = url.getOrDefaultFrameworkModel().getExtensionLoader(MultipleSerialization.class)
-                .getExtension(value);
+            .getExtension(value);
         this.cancellationContext = new CancellationContext();
         this.transportObserver = createTransportObserver();
         this.streamObserver = createStreamObserver();
@@ -94,8 +95,8 @@ public abstract class AbstractStream implements Stream {
             return executor;
         }
         ExecutorRepository executorRepository = url.getOrDefaultApplicationModel()
-                .getExtensionLoader(ExecutorRepository.class)
-                .getDefaultExtension();
+            .getExtensionLoader(ExecutorRepository.class)
+            .getDefaultExtension();
         Executor urlExecutor = executorRepository.getExecutor(url);
         if (urlExecutor == null) {
             urlExecutor = executorRepository.createExecutorIfAbsent(url);
@@ -204,8 +205,39 @@ public abstract class AbstractStream implements Stream {
         this.serviceDescriptor = serviceDescriptor;
     }
 
+    /**
+     * set compressor if required
+     *
+     * @param compressor {@link Compressor}
+     */
     protected AbstractStream setCompressor(Compressor compressor) {
-        this.compressor = compressor;
+        // If compressor is NULL, this will not be set.
+        // Consider whether to throw an exception or handle silently,
+        // But now choose silent processing, Fall back to default.
+        if (compressor != null) {
+            this.compressor = compressor;
+        } else {
+            if (LOGGER.isErrorEnabled()) {
+                LOGGER.error("Compressor is Null, Fall back to default compression." +
+                    " MessageEncoding is " + getCompressor().getMessageEncoding());
+            }
+        }
+        return this;
+    }
+
+
+    protected AbstractStream setDeCompressor(Compressor compressor) {
+        // If compressor is NULL, this will not be set.
+        // Consider whether to throw an exception or handle silently,
+        // But now choose silent processing, Fall back to default.
+        if (compressor != null) {
+            this.deCompressor = compressor;
+        } else {
+            if (LOGGER.isErrorEnabled()) {
+                LOGGER.error("Compressor is Null, Fall back to default deCompression." +
+                    " MessageEncoding is " + getDeCompressor().getMessageEncoding());
+            }
+        }
         return this;
     }
 
@@ -213,6 +245,10 @@ public abstract class AbstractStream implements Stream {
         return this.compressor;
     }
 
+    public Compressor getDeCompressor() {
+        return this.deCompressor;
+    }
+
     public URL getUrl() {
         return url;
     }
@@ -252,7 +288,7 @@ public abstract class AbstractStream implements Stream {
         getTransportSubscriber().onMetadata(trailers, true);
         if (LOGGER.isErrorEnabled()) {
             LOGGER.error("[Triple-Server-Error] status=" + status.code.code + " service=" + getServiceDescriptor().getServiceName()
-                    + " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
+                + " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
         }
     }
 
@@ -284,24 +320,24 @@ public abstract class AbstractStream implements Stream {
         metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), getGrpcMessage(grpcStatus));
         metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), String.valueOf(grpcStatus.code.code));
         Status.Builder builder = Status.newBuilder()
-                .setCode(grpcStatus.code.code)
-                .setMessage(getGrpcMessage(grpcStatus));
+            .setCode(grpcStatus.code.code)
+            .setMessage(getGrpcMessage(grpcStatus));
         Throwable throwable = grpcStatus.cause;
         if (throwable == null) {
             Status status = builder.build();
             metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
-                    TripleUtil.encodeBase64ASCII(status.toByteArray()));
+                TripleUtil.encodeBase64ASCII(status.toByteArray()));
             return metadata;
         }
         DebugInfo debugInfo = DebugInfo.newBuilder()
-                .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 10))
-                // can not use now
-                // .setDetail(throwable.getMessage())
-                .build();
+            .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 10))
+            // can not use now
+            // .setDetail(throwable.getMessage())
+            .build();
         builder.addDetails(Any.pack(debugInfo));
         Status status = builder.build();
         metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
-                TripleUtil.encodeBase64ASCII(status.toByteArray()));
+            TripleUtil.encodeBase64ASCII(status.toByteArray()));
         return metadata;
     }
 
@@ -362,7 +398,7 @@ public abstract class AbstractStream implements Stream {
     }
 
     protected byte[] decompress(byte[] data) {
-        return this.getCompressor().decompress(data);
+        return this.getDeCompressor().decompress(data);
     }
 
     protected abstract class AbstractTransportObserver implements TransportObserver {
@@ -438,7 +474,7 @@ public abstract class AbstractStream implements Stream {
                 this.data = in;
             } else {
                 onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                        .withDescription(DUPLICATED_DATA));
+                    .withDescription(DUPLICATED_DATA));
             }
         }
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
index 09eee4e..379b002 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
@@ -17,19 +17,36 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.rpc.CancellationContext;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public abstract class CancelableStreamObserver<T> implements StreamObserver<T> {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(CancelableStreamObserver.class);
+
     private CancellationContext cancellationContext;
 
-    public CancellationContext getCancellationContext() {
-        return cancellationContext;
+    private final AtomicBoolean contextSet = new AtomicBoolean(false);
+
+    public CancelableStreamObserver() {
     }
 
-    public void setCancellationContext(CancellationContext cancellationContext) {
-        this.cancellationContext = cancellationContext;
+    public CancelableStreamObserver(CancellationContext cancellationContext) {
+        setCancellationContext(cancellationContext);
+    }
+
+    public final void setCancellationContext(CancellationContext cancellationContext) {
+        if (contextSet.compareAndSet(false, true)) {
+            this.cancellationContext = cancellationContext;
+        } else {
+            if (LOGGER.isWarnEnabled()) {
+                LOGGER.warn("CancellationContext already set,do not repeat the set, ignore this set");
+            }
+        }
     }
 
     public final void cancel(Throwable throwable) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index f2502d3..215eaba 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.CancellationContext;
 import org.apache.dubbo.rpc.RpcInvocation;
 
 public class ClientStream extends AbstractClientStream implements Stream {
@@ -29,27 +30,48 @@ public class ClientStream extends AbstractClientStream implements Stream {
 
     @Override
     protected StreamObserver<Object> createStreamObserver() {
-        ClientStreamObserver clientStreamObserver = new ClientStreamObserver() {
-            boolean metaSent;
+        return new ClientStreamObserverImpl(getCancellationContext());
+    }
 
-            @Override
-            public void onNext(Object data) {
-                if (!metaSent) {
-                    metaSent = true;
-                    final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
-                    getTransportSubscriber().onMetadata(metadata, false);
-                }
-                final byte[] bytes = encodeRequest(data);
-                getTransportSubscriber().onData(bytes, false);
+    private class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+
+        private boolean metaSent;
+
+        public ClientStreamObserverImpl(CancellationContext cancellationContext) {
+            super(cancellationContext);
+            this.metaSent = false;
+        }
+
+        @Override
+        public void onNext(Object data) {
+            if (!metaSent) {
+                metaSent = true;
+                final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
+                getTransportSubscriber().onMetadata(metadata, false);
             }
+            final byte[] bytes = encodeRequest(data);
+            getTransportSubscriber().onData(bytes, false);
+        }
 
-            @Override
-            public void onError(Throwable throwable) {
-                transportError(throwable);
+        @Override
+        public void onError(Throwable throwable) {
+            transportError(throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+            getTransportSubscriber().onComplete();
+        }
+
+        @Override
+        public void setCompression(String compression) {
+            if (metaSent) {
+                cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+                return;
             }
-        };
-        clientStreamObserver.setCancellationContext(getCancellationContext());
-        return clientStreamObserver;
+            Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+            setCompressor(compressor);
+        }
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
similarity index 66%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
index 77e283e..145551a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
@@ -17,18 +17,18 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.handler.codec.http2.Http2Error;
+import org.apache.dubbo.common.stream.StreamObserver;
 
-public interface TransportObserver {
+public interface ClientStreamObserver<T> extends StreamObserver<T> {
 
-    void onMetadata(Metadata metadata, boolean endStream);
+    /**
+     * Sets the compression algorithm to use for the call
+     * <p>
+     * For stream set compression needs to determine whether the metadata has been sent, and carry on corresponding processing
+     *
+     * @param compression {@link Compressor}
+     */
+    void setCompression(String compression);
 
-    void onData(byte[] data, boolean endStream);
-
-    default void onReset(Http2Error http2Error) {
-    }
-
-    default void onComplete() {
-    }
 
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 73090fc..3715e98 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -56,9 +56,9 @@ public class ClientTransportObserver implements TransportObserver {
 
         final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
         streamChannel.pipeline().addLast(responseHandler)
-                .addLast(new GrpcDataDecoder(Integer.MAX_VALUE))
-                .addLast(new TripleClientInboundHandler());
-        streamChannel.attr(TripleUtil.CLIENT_STREAM_KEY).set(stream);
+            .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
+            .addLast(new TripleClientInboundHandler());
+        streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
     }
 
     @Override
@@ -70,18 +70,18 @@ public class ClientTransportObserver implements TransportObserver {
             return;
         }
         final Http2Headers headers = new DefaultHttp2Headers(true)
-                .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
-                .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
-                .scheme(SCHEME)
-                .method(HttpMethod.POST.asciiName());
+            .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
+            .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
+            .scheme(SCHEME)
+            .method(HttpMethod.POST.asciiName());
         metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
         headerSent = true;
         streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
-                .addListener(future -> {
-                    if (!future.isSuccess()) {
-                        promise.tryFailure(future.cause());
-                    }
-                });
+            .addListener(future -> {
+                if (!future.isSuccess()) {
+                    promise.tryFailure(future.cause());
+                }
+            });
 
     }
 
@@ -89,13 +89,13 @@ public class ClientTransportObserver implements TransportObserver {
     public void onReset(Http2Error http2Error) {
         resetSent = true;
         streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
-                .addListener(future -> {
-                    if (future.isSuccess()) {
-                        promise.trySuccess();
-                    } else {
-                        promise.tryFailure(future.cause());
-                    }
-                });
+            .addListener(future -> {
+                if (future.isSuccess()) {
+                    promise.trySuccess();
+                } else {
+                    promise.tryFailure(future.cause());
+                }
+            });
     }
 
     @Override
@@ -104,17 +104,17 @@ public class ClientTransportObserver implements TransportObserver {
             return;
         }
         ByteBuf buf = ctx.alloc().buffer();
-        buf.writeByte(TripleUtil.calcCompressFlag(ctx));
+        buf.writeByte(getCompressFlag());
         buf.writeInt(data.length);
         buf.writeBytes(data);
         streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
-                .addListener(future -> {
-                    if (future.isSuccess()) {
-                        promise.trySuccess();
-                    } else {
-                        promise.tryFailure(future.cause());
-                    }
-                });
+            .addListener(future -> {
+                if (future.isSuccess()) {
+                    promise.trySuccess();
+                } else {
+                    promise.tryFailure(future.cause());
+                }
+            });
     }
 
     @Override
@@ -127,12 +127,18 @@ public class ClientTransportObserver implements TransportObserver {
         }
         endStreamSent = true;
         streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
-                .addListener(future -> {
-                    if (future.isSuccess()) {
-                        promise.trySuccess();
-                    } else {
-                        promise.tryFailure(future.cause());
-                    }
-                });
+            .addListener(future -> {
+                if (future.isSuccess()) {
+                    promise.trySuccess();
+                } else {
+                    promise.tryFailure(future.cause());
+                }
+            });
     }
+
+    private int getCompressFlag() {
+        AbstractClientStream stream = streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).get();
+        return TransportObserver.calcCompressFlag(stream.getCompressor());
+    }
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
index e179d7a..776fb89 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -20,6 +20,9 @@ package org.apache.dubbo.rpc.protocol.tri;
 import org.apache.dubbo.common.extension.ExtensionScope;
 import org.apache.dubbo.common.extension.SPI;
 import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+import java.util.Set;
 
 import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 
@@ -35,12 +38,14 @@ public interface Compressor {
 
     /**
      * message encoding of current compressor
+     *
      * @return return message encoding
      */
     String getMessageEncoding();
 
     /**
      * compress payload
+     *
      * @param payloadByteArr payload byte array
      * @return compressed payload byte array
      */
@@ -48,9 +53,26 @@ public interface Compressor {
 
     /**
      * decompress payload
+     *
      * @param payloadByteArr payload byte array
      * @return decompressed payload byte array
      */
     byte[] decompress(byte[] payloadByteArr);
 
+
+    static Compressor getCompressor(FrameworkModel frameworkModel, String compressorStr) {
+        if (null == compressorStr) {
+            return null;
+        }
+        return frameworkModel.getExtensionLoader(Compressor.class).getExtension(compressorStr);
+    }
+
+
+    static String getAcceptEncoding(FrameworkModel frameworkModel) {
+        Set<String> supportedEncodingSet = frameworkModel.getExtensionLoader(Compressor.class).getSupportedExtensions();
+        if (supportedEncodingSet.isEmpty()) {
+            return null;
+        }
+        return String.join(",", supportedEncodingSet);
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
index bce6e98..59122fe 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
@@ -16,6 +16,9 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
@@ -23,16 +26,27 @@ import io.netty.handler.codec.ReplayingDecoder;
 import java.util.List;
 
 public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecodeState> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(GrpcDataDecoder.class);
     private static final int RESERVED_MASK = 0xFE;
     private static final int COMPRESSED_FLAG_MASK = 1;
     private final int maxDataSize;
 
     private int len;
     private boolean compressedFlag;
+    private final boolean client;
 
-    public GrpcDataDecoder(int maxDataSize) {
+    public GrpcDataDecoder(int maxDataSize, boolean client) {
         super(GrpcDecodeState.HEADER);
         this.maxDataSize = maxDataSize;
+        this.client = client;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        if (LOGGER.isErrorEnabled()) {
+            LOGGER.error("Grpc data read error ", cause);
+        }
+        ctx.close();
     }
 
     @Override
@@ -42,17 +56,17 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
                 int type = in.readByte();
                 if ((type & RESERVED_MASK) != 0) {
                     throw GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                            .withDescription("gRPC frame header malformed: reserved bits not zero")
-                            .asException();
+                        .withDescription("gRPC frame header malformed: reserved bits not zero")
+                        .asException();
                 }
                 compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
 
                 len = in.readInt();
                 if (len < 0 || len > maxDataSize) {
                     throw GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
-                            .withDescription(String.format("gRPC message exceeds maximum size %d: %d",
-                                    maxDataSize, len))
-                            .asException();
+                        .withDescription(String.format("gRPC message exceeds maximum size %d: %d",
+                            maxDataSize, len))
+                        .asException();
                 }
                 checkpoint(GrpcDecodeState.PAYLOAD);
             case PAYLOAD:
@@ -70,14 +84,12 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
         if (!compressedFlag) {
             return data;
         }
-
-        Compressor compressor = TripleUtil.getCompressor(ctx);
+        Compressor compressor = getDeCompressor(ctx, client);
         if (null == compressor) {
             throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
                 .withDescription("gRPC message compressor not found")
                 .asException();
         }
-
         return compressor.decompress(data);
     }
 
@@ -85,4 +97,20 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
         HEADER,
         PAYLOAD
     }
+
+
+    private Compressor getDeCompressor(ChannelHandlerContext ctx, boolean client) {
+        AbstractStream stream = client ? getClientStream(ctx) : getServerStream(ctx);
+        return stream.getDeCompressor();
+    }
+
+    private AbstractClientStream getClientStream(ChannelHandlerContext ctx) {
+        return ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
+    }
+
+    private AbstractServerStream getServerStream(ChannelHandlerContext ctx) {
+        return ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+    }
+
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index ce1c6d7..7958b61 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -31,7 +31,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
 
     @Override
     protected StreamObserver<Object> createStreamObserver() {
-        return new ServerStreamObserver();
+        return new ServerStreamObserverImpl();
     }
 
     @Override
@@ -39,7 +39,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
         return new StreamTransportObserver();
     }
 
-    private class ServerStreamObserver implements StreamObserver<Object> {
+    private class ServerStreamObserverImpl implements ServerStreamObserver<Object> {
         private boolean headersSent;
 
         @Override
@@ -55,8 +55,8 @@ public class ServerStream extends AbstractServerStream implements Stream {
         @Override
         public void onError(Throwable throwable) {
             final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withCause(throwable)
-                    .withDescription("Biz exception");
+                .withCause(throwable)
+                .withDescription("Biz exception");
             transportError(status);
         }
 
@@ -67,6 +67,18 @@ public class ServerStream extends AbstractServerStream implements Stream {
             metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
             getTransportSubscriber().onMetadata(metadata, true);
         }
+
+        @Override
+        public void setCompression(String compression) {
+            if (headersSent) {
+                final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                    .withDescription("Metadata already has been sent,can not set compression");
+                transportError(status);
+                return;
+            }
+            Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+            setCompressor(compressor);
+        }
     }
 
     private class StreamTransportObserver extends AbstractTransportObserver implements TransportObserver {
@@ -102,7 +114,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
                         subscribe((StreamObserver<Object>) result.getValue());
                     } catch (Throwable t) {
                         transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                                .withDescription("Failed to create server's observer"));
+                            .withDescription("Failed to create server's observer"));
                     }
                 } finally {
                     RpcContext.removeCancellationContext();
@@ -122,8 +134,8 @@ public class ServerStream extends AbstractServerStream implements Stream {
                     biStreamOnData(in);
                 } catch (Throwable t) {
                     transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                            .withDescription("Deserialize request failed")
-                            .withCause(t));
+                        .withDescription("Deserialize request failed")
+                        .withCause(t));
                 }
             });
         }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
similarity index 66%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
index 77e283e..9aaba38 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
@@ -17,18 +17,18 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.handler.codec.http2.Http2Error;
+import org.apache.dubbo.common.stream.StreamObserver;
 
-public interface TransportObserver {
+public interface ServerStreamObserver<T> extends StreamObserver<T> {
 
-    void onMetadata(Metadata metadata, boolean endStream);
+    /**
+     * Sets the compression algorithm to use for the call
+     * <p>
+     * For stream set compression needs to determine whether the metadata has been sent, and carry on corresponding processing
+     *
+     * @param compression {@link Compressor}
+     */
+    void setCompression(String compression);
 
-    void onData(byte[] data, boolean endStream);
-
-    default void onReset(Http2Error http2Error) {
-    }
-
-    default void onComplete() {
-    }
 
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index b1326ff..b25fef9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -58,22 +58,22 @@ public class ServerTransportObserver implements TransportObserver {
         }
         // If endStream is true, the channel will be closed, so you cannot listen for errors and continue sending any frame
         ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
-                .addListener(future -> {
-                    if (!future.isSuccess()) {
-                        LOGGER.warn("send header error endStream=" + endStream, future.cause());
-                    }
-                });
+            .addListener(future -> {
+                if (!future.isSuccess()) {
+                    LOGGER.warn("send header error endStream=" + endStream, future.cause());
+                }
+            });
     }
 
     @Override
     public void onReset(Http2Error http2Error) {
         resetSent = true;
         ctx.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
-                .addListener(future -> {
-                    if (!future.isSuccess()) {
-                        LOGGER.warn("write reset error", future.cause());
-                    }
-                });
+            .addListener(future -> {
+                if (!future.isSuccess()) {
+                    LOGGER.warn("write reset error", future.cause());
+                }
+            });
     }
 
     @Override
@@ -82,14 +82,20 @@ public class ServerTransportObserver implements TransportObserver {
             return;
         }
         ByteBuf buf = ctx.alloc().buffer();
-        buf.writeByte(TripleUtil.calcCompressFlag(ctx));
+        buf.writeByte(getCompressFlag());
         buf.writeInt(data.length);
         buf.writeBytes(data);
         ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
-                .addListener(future -> {
-                    if (!future.isSuccess()) {
-                        LOGGER.warn("send data error endStream=" + endStream, future.cause());
-                    }
-                });
+            .addListener(future -> {
+                if (!future.isSuccess()) {
+                    LOGGER.warn("send data error endStream=" + endStream, future.cause());
+                }
+            });
+    }
+
+
+    private int getCompressFlag() {
+        AbstractServerStream stream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+        return TransportObserver.calcCompressFlag(stream.getCompressor());
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 77e283e..01415a5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -31,4 +31,12 @@ public interface TransportObserver {
     default void onComplete() {
     }
 
+
+    static int calcCompressFlag(Compressor compressor) {
+        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+            return 0;
+        }
+        return 1;
+    }
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index 73d25f9..377b093 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -22,12 +22,12 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+        final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
 
         final byte[] data = (byte[]) msg;
         if (clientStream != null) {
             clientStream.asTransportObserver()
-                    .onData(data, false);
+                .onData(data, false);
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 88d8ea0..83928d2 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -76,15 +76,13 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
         if (StringUtils.isNotEmpty(ssl)) {
             ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
         }
-
         // Compressor can not be set by dynamic config
         String compressorStr = ConfigurationUtils
-            .getCachedDynamicProperty(inv.getModuleModel(),COMPRESSOR_KEY,DEFAULT_COMPRESSOR);
+            .getCachedDynamicProperty(inv.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
 
-        if (null != compressorStr && !compressorStr.equals(DEFAULT_COMPRESSOR)) {
-            Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
+        Compressor compressor = Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
+        if (compressor != null) {
             stream.setCompressor(compressor);
-            ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
         }
 
         stream.service(consumerModel)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 586efab..88c534f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.constants.CommonConstants;
 
-import io.netty.handler.codec.http2.Http2CodecUtil;
 import io.netty.util.AsciiString;
 import io.netty.util.AttributeKey;
 
@@ -29,8 +28,6 @@ public interface TripleConstant {
 
     String SERIALIZATION_KEY = "serialization";
     String TE_KEY = "te";
-    // each header size
-    long DEFAULT_HEADER_LIST_SIZE = Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE;
 
     AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
 
@@ -38,4 +35,10 @@ public interface TripleConstant {
     AsciiString HTTPS_SCHEME = AsciiString.of("https");
     AsciiString HTTP_SCHEME = AsciiString.of("http");
 
+
+    AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.newInstance(
+        "tri_server_stream");
+    AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
+        "tri_client_stream");
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index 930fc0a..0a35d2b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -63,28 +63,25 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
     }
 
     private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame) {
-        AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+        final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
         clientStream.cancelByRemote(Http2Error.valueOf(resetFrame.errorCode()));
         ctx.close();
     }
 
     private void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) {
         Http2Headers headers = msg.headers();
-        AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
-
+        final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
         CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
         if (null != messageEncoding) {
             String compressorStr = messageEncoding.toString();
-            if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
-                Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
-                    .getExtensionLoader(Compressor.class).getExtension(compressorStr);
+            if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
+                Compressor compressor = Compressor.getCompressor(clientStream.getUrl().getOrDefaultFrameworkModel(), compressorStr);
                 if (null == compressor) {
                     throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
                         .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
                         .asException();
                 } else {
-                    clientStream.setCompressor(compressor);
-                    ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+                    clientStream.setDeCompressor(compressor);
                 }
             }
         }
@@ -96,8 +93,8 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
         final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
             .withCause(cause);
         Metadata metadata = new DefaultMetadata();
@@ -111,7 +108,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
     public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception {
         super.channelRead(ctx, msg.content());
         if (msg.isEndStream()) {
-            final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+            final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
             // stream already closed;
             if (clientStream != null) {
                 clientStream.asTransportObserver().onComplete();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index eb9505d..1fea532 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -84,7 +84,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
     public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
         Http2Error http2Error = Http2Error.valueOf(frame.errorCode());
-        final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
+        final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
         serverStream.cancelByRemote(http2Error);
         ctx.close();
     }
@@ -106,7 +106,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         super.channelRead(ctx, msg.content());
 
         if (msg.isEndStream()) {
-            final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
+            final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
             if (serverStream != null) {
                 serverStream.asTransportObserver().onComplete();
             }
@@ -228,16 +228,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
         if (null != messageEncoding) {
             String compressorStr = messageEncoding.toString();
-            if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
-                Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
-                    getExtensionLoader(Compressor.class).getExtension(compressorStr);
+            if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
+                Compressor compressor = Compressor.getCompressor(frameworkModel, compressorStr);
                 if (null == compressor) {
                     TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
                         GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
                             .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
                 } else {
-                    stream.setCompressor(compressor);
-                    ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+                    stream.setDeCompressor(compressor);
                 }
             }
         }
@@ -246,8 +244,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         if (msg.isEndStream()) {
             observer.onComplete();
         }
-
-        channel.attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
+        channel.attr(TripleConstant.SERVER_STREAM_KEY).set(stream);
     }
 
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 8c37699..2d16e8e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -22,7 +22,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
+        final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
         final byte[] data = (byte[]) msg;
         if (serverStream != null) {
             serverStream.asTransportObserver()
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
index 0d3b7d1..7120aef 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
@@ -24,7 +24,7 @@ import io.netty.channel.ChannelPipeline;
 
 public class TripleServerInitializer extends ChannelInitializer<Channel> {
 
-    private FrameworkModel frameworkModel;
+    private final FrameworkModel frameworkModel;
 
     public TripleServerInitializer(FrameworkModel frameworkModel) {
         this.frameworkModel = frameworkModel;
@@ -35,7 +35,7 @@ public class TripleServerInitializer extends ChannelInitializer<Channel> {
         final ChannelPipeline p = ch.pipeline();
         p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
         // TODO constraint MAX DATA_SIZE
-        p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE));
+        p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
         p.addLast(new TripleServerInboundHandler());
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 1467c8e..c352b7a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -36,7 +36,6 @@ import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
 import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
 import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.util.AttributeKey;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -55,14 +54,6 @@ import java.util.Set;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 
 public class TripleUtil {
-
-    public static final AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.newInstance(
-        "tri_server_stream");
-    public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
-        "tri_client_stream");
-    public static final AttributeKey<Compressor> COMPRESSOR_KEY = AttributeKey.newInstance(
-        "tri_compressor");
-    public static final String LANGUAGE = "java";
     // Some exceptions are not very useful and add too much noise to the log
     private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
     private static final Set<Class<?>> QUIET_EXCEPTIONS_CLASS = new HashSet<>();
@@ -85,26 +76,6 @@ public class TripleUtil {
         return false;
     }
 
-    public static AbstractServerStream getServerStream(ChannelHandlerContext ctx) {
-        return ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).get();
-    }
-
-    public static AbstractClientStream getClientStream(ChannelHandlerContext ctx) {
-        return ctx.channel().attr(TripleUtil.CLIENT_STREAM_KEY).get();
-    }
-
-    public static Compressor getCompressor(ChannelHandlerContext ctx) {
-        return ctx.channel().attr(COMPRESSOR_KEY).get();
-    }
-
-    public static int calcCompressFlag(ChannelHandlerContext ctx) {
-        Compressor compressor = getCompressor(ctx);
-        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
-            return 0;
-        }
-        return 1;
-    }
-
     /**
      * must starts from application/grpc
      */
@@ -201,44 +172,6 @@ public class TripleUtil {
         }
     }
 
-    public static TripleWrapper.TripleExceptionWrapper wrapException(URL url, Throwable throwable,
-                                                                     String serializeType,
-                                                                     MultipleSerialization serialization) {
-        try {
-            final TripleWrapper.TripleExceptionWrapper.Builder builder = TripleWrapper.TripleExceptionWrapper.newBuilder()
-                .setLanguage(LANGUAGE)
-                .setClassName(throwable.getClass().getName())
-                .setSerialization(serializeType);
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            serialization.serialize(url, serializeType, builder.getClassName(), throwable, bos);
-            builder.setData(ByteString.copyFrom(bos.toByteArray()));
-            bos.close();
-            return builder.build();
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to pack wrapper exception", e);
-        }
-    }
-
-    public static Throwable unWrapException(URL url, TripleWrapper.TripleExceptionWrapper wrap,
-                                            String serializeType,
-                                            MultipleSerialization serialization) {
-        if (wrap == null) {
-            return null;
-        }
-        if (!LANGUAGE.equals(wrap.getLanguage())) {
-            return null;
-        }
-        try {
-            final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
-            Object obj = serialization.deserialize(url, serializeType, wrap.getClassName(), bais);
-            bais.close();
-            return (Throwable) obj;
-        } catch (Exception e) {
-            // if this null ,can get common exception
-            return null;
-        }
-    }
-
 
     public static TripleWrapper.TripleRequestWrapper wrapReq(URL url, String serializeType, Object req,
                                                              String type,
@@ -359,13 +292,4 @@ public class TripleUtil {
         return serializeType;
     }
 
-    public static String calcAcceptEncoding(URL url) {
-        Set<String> supportedEncodingSet = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getSupportedExtensions();
-        if (supportedEncodingSet.isEmpty()) {
-            return null;
-        }
-
-        return String.join(",", supportedEncodingSet);
-    }
-
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 2f431b8..452b514 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
 
 import com.google.protobuf.Any;
 import com.google.rpc.DebugInfo;
@@ -40,7 +41,7 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
 
     @Override
     protected StreamObserver<Object> createStreamObserver() {
-        return new ClientStreamObserver();
+        return new UnaryClientStreamObserverImpl();
     }
 
     @Override
@@ -122,4 +123,26 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
             }
         }
     }
+
+
+    private class UnaryClientStreamObserverImpl implements StreamObserver<Object> {
+
+        @Override
+        public void onNext(Object data) {
+            RpcInvocation invocation = (RpcInvocation) data;
+            final Metadata metadata = createRequestMeta(invocation);
+            getTransportSubscriber().onMetadata(metadata, false);
+            final byte[] bytes = encodeRequest(invocation);
+            getTransportSubscriber().onData(bytes, false);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+        }
+
+        @Override
+        public void onCompleted() {
+            getTransportSubscriber().onComplete();
+        }
+    }
 }