You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/11 03:25:03 UTC

[dubbo] branch 3.0 updated: Triple transmission supports message compression (#8817)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 17f85db  Triple transmission supports message compression (#8817)
17f85db is described below

commit 17f85dbe8efa8fd67a074b0e10133feabb57e87c
Author: XIACYBING <39...@users.noreply.github.com>
AuthorDate: Mon Oct 11 11:24:54 2021 +0800

    Triple transmission supports message compression (#8817)
    
    * [3.0-Triple] Support triple message compress(#8658)
    
    * [3.0-Triple] Rename the compressor key and move to dubbo-rpc-api module
    
    * [3.0-Triple] change the access method of the compressor configuration
    
    * [3.0-Triple] get the configuration of compressor from url and set the scope of compressor extension to framework
    
    * [3.0-Triple] remove unuse import
---
 .../main/java/org/apache/dubbo/rpc/Constants.java  |  2 +
 .../rpc/protocol/tri/AbstractClientStream.java     | 12 +++-
 .../rpc/protocol/tri/AbstractServerStream.java     | 13 +++-
 .../dubbo/rpc/protocol/tri/AbstractStream.java     | 18 ++++++
 .../rpc/protocol/tri/ClientTransportObserver.java  |  2 +-
 .../apache/dubbo/rpc/protocol/tri/Compressor.java  | 56 ++++++++++++++++
 .../dubbo/rpc/protocol/tri/GrpcDataDecoder.java    | 24 ++++---
 .../dubbo/rpc/protocol/tri/GzipCompressor.java     | 74 ++++++++++++++++++++++
 .../dubbo/rpc/protocol/tri/IdentityCompressor.java | 41 ++++++++++++
 .../dubbo/rpc/protocol/tri/ServerStream.java       |  2 +-
 .../rpc/protocol/tri/ServerTransportObserver.java  |  2 +-
 .../protocol/tri/TripleClientRequestHandler.java   | 12 ++++
 .../dubbo/rpc/protocol/tri/TripleHeaderEnum.java   |  2 +
 .../tri/TripleHttp2ClientResponseHandler.java      | 15 +++++
 .../tri/TripleHttp2FrameServerHandler.java         | 14 ++++
 .../apache/dubbo/rpc/protocol/tri/TripleUtil.java  | 20 ++++++
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  |  2 +-
 .../org.apache.dubbo.rpc.protocol.tri.Compressor   |  2 +
 .../dubbo/rpc/protocol/tri/CompressorTest.java     | 53 ++++++++++++++++
 .../rpc/protocol/tri/UnaryClientStreamTest.java    |  2 +-
 20 files changed, 352 insertions(+), 16 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index dbdf11a..8bb27cb 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -55,6 +55,8 @@ public interface Constants {
 
     String STUB_EVENT_METHODS_KEY = "dubbo.stub.event.methods";
 
+    String COMPRESSOR_KEY = "dubbo.rpc.tri.compressor";
+
     String PROXY_KEY = "proxy";
 
     String EXECUTES_KEY = "executes";
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 164e799..822471f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -18,10 +18,12 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.remoting.api.Connection;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
 import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.Constants;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.triple.TripleWrapper;
@@ -34,6 +36,8 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
 public abstract class AbstractClientStream extends AbstractStream implements Stream {
     private ConsumerModel consumerModel;
     private Connection connection;
@@ -110,7 +114,7 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
         }
         out = TripleUtil.pack(obj);
 
-        return out;
+        return super.compress(out);
     }
 
     private TripleWrapper.TripleRequestWrapper getRequestWrapper(Object value) {
@@ -168,7 +172,11 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
                         (String) inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
                 .putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
                         (String) inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
-                .putIfNotNull(TripleHeaderEnum.SERVICE_GROUP.getHeader(), inv.getInvoker().getUrl().getGroup());
+                .putIfNotNull(TripleHeaderEnum.SERVICE_GROUP.getHeader(), inv.getInvoker().getUrl().getGroup())
+                .putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(),
+                    ConfigurationUtils.getGlobalConfiguration(inv.getInvoker().getUrl().getScopeModel()).getString(Constants.COMPRESSOR_KEY, DEFAULT_COMPRESSOR))
+                .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(),
+                    TripleUtil.calcAcceptEncoding(inv.getInvoker().getUrl()));
         final Map<String, Object> attachments = inv.getObjectAttachments();
         if (attachments != null) {
             convertAttachment(metadata, attachments);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 92127e7..7293701 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -175,6 +175,16 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
         }
     }
 
+    /**
+     * create basic meta data
+     */
+    protected Metadata createRequestMeta() {
+        Metadata metadata = new DefaultMetadata();
+        metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), super.getCompressor().getMessageEncoding())
+                .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), TripleUtil.calcAcceptEncoding(invoker.getUrl()));
+        return metadata;
+    }
+
     protected byte[] encodeResponse(Object value) {
         final com.google.protobuf.Message message;
         if (getMethodDescriptor().isNeedWrap()) {
@@ -183,7 +193,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
         } else {
             message = (Message) value;
         }
-        return TripleUtil.pack(message);
+        byte[] out = TripleUtil.pack(message);
+        return super.compress(out);
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index c6383cd..60a6f97 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -58,6 +58,7 @@ public abstract class AbstractStream implements Stream {
     private String serializeType;
     private StreamObserver<Object> streamSubscriber;
     private TransportObserver transportSubscriber;
+    private Compressor compressor = IdentityCompressor.NONE;
 
     private final CancellationContext cancellationContext;
     private volatile boolean cancelled = false;
@@ -203,6 +204,15 @@ public abstract class AbstractStream implements Stream {
         this.serviceDescriptor = serviceDescriptor;
     }
 
+    protected AbstractStream setCompressor(Compressor compressor) {
+        this.compressor = compressor;
+        return this;
+    }
+
+    public Compressor getCompressor() {
+        return this.compressor;
+    }
+
     public URL getUrl() {
         return url;
     }
@@ -347,6 +357,14 @@ public abstract class AbstractStream implements Stream {
         }
     }
 
+    protected byte[] compress(byte[] data) {
+        return this.getCompressor().compress(data);
+    }
+
+    protected byte[] decompress(byte[] data) {
+        return this.getCompressor().decompress(data);
+    }
+
     protected abstract class AbstractTransportObserver implements TransportObserver {
         private Metadata headers;
         private Metadata trailers;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 78bda21..73090fc 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -104,7 +104,7 @@ public class ClientTransportObserver implements TransportObserver {
             return;
         }
         ByteBuf buf = ctx.alloc().buffer();
-        buf.writeByte(0);
+        buf.writeByte(TripleUtil.calcCompressFlag(ctx));
         buf.writeInt(data.length);
         buf.writeBytes(data);
         streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
new file mode 100644
index 0000000..e179d7a
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.common.extension.ExtensionScope;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.Constants;
+
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
+/**
+ * compress payload for grpc request, and decompress response payload
+ * Configure it in files, pictures or other configurations that exist in the system properties
+ * Configure {@link Constants#COMPRESSOR_KEY} in dubbo.properties、dubbo.yml or other configuration that exist in the system property
+ */
+@SPI(value = DEFAULT_COMPRESSOR, scope = ExtensionScope.FRAMEWORK)
+public interface Compressor {
+
+    String DEFAULT_COMPRESSOR = "identity";
+
+    /**
+     * message encoding of current compressor
+     * @return return message encoding
+     */
+    String getMessageEncoding();
+
+    /**
+     * compress payload
+     * @param payloadByteArr payload byte array
+     * @return compressed payload byte array
+     */
+    byte[] compress(byte[] payloadByteArr);
+
+    /**
+     * decompress payload
+     * @param payloadByteArr payload byte array
+     * @return decompressed payload byte array
+     */
+    byte[] decompress(byte[] payloadByteArr);
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
index 29d342a..bce6e98 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
@@ -45,14 +45,7 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
                             .withDescription("gRPC frame header malformed: reserved bits not zero")
                             .asException();
                 }
-                // compression is not supported yet
-                // TODO support it
                 compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
-                if (compressedFlag) {
-                    throw GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                            .withDescription("Compression is not supported ")
-                            .asException();
-                }
 
                 len = in.readInt();
                 if (len < 0 || len > maxDataSize) {
@@ -65,7 +58,7 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
             case PAYLOAD:
                 byte[] dst = new byte[len];
                 in.readBytes(dst);
-                out.add(dst);
+                out.add(this.decompressData(dst, ctx));
                 checkpoint(GrpcDecodeState.HEADER);
                 break;
             default:
@@ -73,6 +66,21 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
         }
     }
 
+    private byte[] decompressData(byte[] data, ChannelHandlerContext ctx) {
+        if (!compressedFlag) {
+            return data;
+        }
+
+        Compressor compressor = TripleUtil.getCompressor(ctx);
+        if (null == compressor) {
+            throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
+                .withDescription("gRPC message compressor not found")
+                .asException();
+        }
+
+        return compressor.decompress(data);
+    }
+
     enum GrpcDecodeState {
         HEADER,
         PAYLOAD
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GzipCompressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GzipCompressor.java
new file mode 100644
index 0000000..546b76f
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GzipCompressor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.RpcException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * gzip compressor
+ */
+public class GzipCompressor implements Compressor {
+
+    @Override
+    public String getMessageEncoding() {
+        return "gzip";
+    }
+
+    @Override
+    public byte[] compress(byte[] payloadByteArr) throws RpcException {
+        if (null == payloadByteArr || 0 == payloadByteArr.length) {
+            return new byte[0];
+        }
+
+        ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+        try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteOutStream)) {
+            gzipOutputStream.write(payloadByteArr);
+        } catch (Exception exception) {
+            throw new RpcException(exception);
+        }
+
+        return byteOutStream.toByteArray();
+    }
+
+    @Override
+    public byte[] decompress(byte[] payloadByteArr) throws RpcException {
+        if (null == payloadByteArr || 0 == payloadByteArr.length) {
+            return new byte[0];
+        }
+
+        ByteArrayInputStream byteInStream = new ByteArrayInputStream(payloadByteArr);
+        ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+        try (GZIPInputStream gzipInputStream = new GZIPInputStream(byteInStream)) {
+            int readByteNum;
+            byte[] bufferArr = new byte[256];
+            while ((readByteNum = gzipInputStream.read(bufferArr)) >= 0) {
+                byteOutStream.write(bufferArr, 0, readByteNum);
+            }
+        } catch (Exception exception) {
+            throw new RpcException(exception);
+        }
+
+        return byteOutStream.toByteArray();
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
new file mode 100644
index 0000000..a8ac2fc
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+/**
+ * Default compressor
+ */
+public class IdentityCompressor implements Compressor {
+
+    public static final Compressor NONE = new IdentityCompressor();
+
+    @Override
+    public String getMessageEncoding() {
+        return "identity";
+    }
+
+    @Override
+    public byte[] compress(byte[] payloadByteArr) {
+        return payloadByteArr;
+    }
+
+    @Override
+    public byte[] decompress(byte[] payloadByteArr) {
+        return payloadByteArr;
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 666fb2e..ce1c6d7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -45,7 +45,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
         @Override
         public void onNext(Object data) {
             if (!headersSent) {
-                getTransportSubscriber().onMetadata(new DefaultMetadata(), false);
+                getTransportSubscriber().onMetadata(createRequestMeta(), false);
                 headersSent = true;
             }
             final byte[] bytes = encodeResponse(data);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index 552ce7e..b1326ff 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -82,7 +82,7 @@ public class ServerTransportObserver implements TransportObserver {
             return;
         }
         ByteBuf buf = ctx.alloc().buffer();
-        buf.writeByte(0);
+        buf.writeByte(TripleUtil.calcCompressFlag(ctx));
         buf.writeInt(data.length);
         buf.writeBytes(data);
         ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 0149799..7b28c02 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.common.utils.CollectionUtils;
@@ -40,6 +41,9 @@ import io.netty.channel.ChannelPromise;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
 public class TripleClientRequestHandler extends ChannelDuplexHandler {
 
     private final FrameworkModel frameworkModel;
@@ -72,6 +76,14 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
         if (StringUtils.isNotEmpty(ssl)) {
             ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
         }
+
+        // get compressor
+        String compressorStr = ConfigurationUtils
+            .getGlobalConfiguration(url.getScopeModel()).getString(COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+        Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
+        stream.setCompressor(compressor);
+        ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+
         stream.service(consumerModel)
                 .connection(Connection.getConnectionFromChannel(ctx.channel()))
                 .method(methodDescriptor)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 18a3219..0d94ac7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -35,6 +35,8 @@ public enum TripleHeaderEnum {
     CONTENT_TYPE_KEY("content-type"),
     CONTENT_PROTO("application/grpc+proto"),
     APPLICATION_GRPC("application/grpc"),
+    GRPC_ENCODING("grpc-encoding"),
+    GRPC_ACCEPT_ENCODING("grpc-accept-encoding"),
     TRICE_ID_KEY("tri-trace-traceid"),
     RPC_ID_KEY("tri-trace-rpcid"),
     CONSUMER_APP_NAME_KEY("tri-consumer-appname"),
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index f1a2f74..b41d4e7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -69,6 +69,21 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
     private void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) {
         Http2Headers headers = msg.headers();
         AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+
+        CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
+        if (null != messageEncoding) {
+            String compressorStr = messageEncoding.toString();
+            Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
+                .getExtensionLoader(Compressor.class).getExtension(compressorStr);
+            if (null == compressor) {
+                throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
+                    .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
+                    .asException();
+            } else {
+                clientStream.setCompressor(compressor);
+                ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+            }
+        }
         final TransportObserver observer = clientStream.asTransportObserver();
         observer.onMetadata(new Http2HeaderMeta(headers), false);
         if (msg.isEndStream()) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 2c86a95..64443a5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -223,6 +223,20 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             // Then you need to find the corresponding parameter according to the request body
             stream.methods(methodDescriptors);
         }
+        CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
+        if (null != messageEncoding) {
+            String compressorStr = messageEncoding.toString();
+            Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
+                getExtensionLoader(Compressor.class).getExtension(compressorStr);
+            if (null == compressor) {
+                TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+                    GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+                        .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
+            } else {
+                stream.setCompressor(compressor);
+                ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+            }
+        }
         final TransportObserver observer = stream.asTransportObserver();
         observer.onMetadata(new Http2HeaderMeta(headers), false);
         if (msg.isEndStream()) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 72bb526..ad7d809 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -60,6 +60,8 @@ public class TripleUtil {
             "tri_server_stream");
     public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
             "tri_client_stream");
+    public static final AttributeKey<Compressor> COMPRESSOR_KEY = AttributeKey.newInstance(
+            "tri_compressor");
 
     // Some exceptions are not very useful and add too much noise to the log
     private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
@@ -95,6 +97,15 @@ public class TripleUtil {
         return ctx.channel().attr(TripleUtil.CLIENT_STREAM_KEY).get();
     }
 
+    public static Compressor getCompressor(ChannelHandlerContext ctx) {
+        return ctx.channel().attr(COMPRESSOR_KEY).get();
+    }
+
+    public static int calcCompressFlag(ChannelHandlerContext ctx) {
+        Compressor compressor = getCompressor(ctx);
+        return null == compressor ? 0 : 1;
+    }
+
     /**
      * must starts from application/grpc
      */
@@ -349,4 +360,13 @@ public class TripleUtil {
         return serializeType;
     }
 
+    public static String calcAcceptEncoding(URL url) {
+        Set<String> supportedEncodingSet = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getSupportedExtensions();
+        if (supportedEncodingSet.isEmpty()) {
+            return null;
+        }
+
+        return String.join(",", supportedEncodingSet);
+    }
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 2140463..b5ec80e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -113,7 +113,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
                         }
                         return;
                     }
-                    Metadata metadata = new DefaultMetadata();
+                    Metadata metadata = createRequestMeta();
                     metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
                     getTransportSubscriber().onMetadata(metadata, false);
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.Compressor b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.Compressor
new file mode 100644
index 0000000..1a485e2
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.Compressor
@@ -0,0 +1,2 @@
+identity=org.apache.dubbo.rpc.protocol.tri.IdentityCompressor
+gzip=org.apache.dubbo.rpc.protocol.tri.GzipCompressor
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/CompressorTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/CompressorTest.java
new file mode 100644
index 0000000..5e9bdac
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/CompressorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class CompressorTest {
+
+    private static final String TEST_STR;
+
+    static {
+        StringBuilder builder = new StringBuilder();
+        int charNum = 1000000;
+        for (int i = 0; i < charNum; i++) {
+            builder.append("a");
+        }
+
+        TEST_STR = builder.toString();
+    }
+
+    @ValueSource(strings = {"gzip", "identity"})
+    @ParameterizedTest
+    void compression(String compressorName) {
+        System.out.println("current compressor is " + compressorName);
+        Compressor compressor = ExtensionLoader.getExtensionLoader(Compressor.class).getExtension(compressorName);
+
+        byte[] compressedByteArr = compressor.compress(TEST_STR.getBytes());
+        System.out.println("compressed byte length:" + compressedByteArr.length);
+
+        byte[] decompressedByteArr = compressor.decompress(compressedByteArr);
+        System.out.println("decompressed byte length:" + decompressedByteArr.length);
+        Assertions.assertEquals(new String(decompressedByteArr), TEST_STR);
+    }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 611bb98..47bc48d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -49,7 +49,7 @@ class UnaryClientStreamTest {
         when(inv.getInvoker()).thenReturn(mockInvoker);
         // no subscriber
         Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
-        verify(mockInvoker, times(2)).getUrl();
+        verify(mockInvoker, times(4)).getUrl();
 
         TransportObserver transportObserver = Mockito.mock(TransportObserver.class);
         stream.subscribe(transportObserver);