You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/11 03:25:03 UTC
[dubbo] branch 3.0 updated: Triple transmission supports message
compression (#8817)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 17f85db Triple transmission supports message compression (#8817)
17f85db is described below
commit 17f85dbe8efa8fd67a074b0e10133feabb57e87c
Author: XIACYBING <39...@users.noreply.github.com>
AuthorDate: Mon Oct 11 11:24:54 2021 +0800
Triple transmission supports message compression (#8817)
* [3.0-Triple] Support triple message compress(#8658)
* [3.0-Triple] Rename the compressor key and move to dubbo-rpc-api module
* [3.0-Triple] change the access method of the compressor configuration
* [3.0-Triple] get the configuration of compressor from url and set the scope of compressor extension to framework
* [3.0-Triple] remove unuse import
---
.../main/java/org/apache/dubbo/rpc/Constants.java | 2 +
.../rpc/protocol/tri/AbstractClientStream.java | 12 +++-
.../rpc/protocol/tri/AbstractServerStream.java | 13 +++-
.../dubbo/rpc/protocol/tri/AbstractStream.java | 18 ++++++
.../rpc/protocol/tri/ClientTransportObserver.java | 2 +-
.../apache/dubbo/rpc/protocol/tri/Compressor.java | 56 ++++++++++++++++
.../dubbo/rpc/protocol/tri/GrpcDataDecoder.java | 24 ++++---
.../dubbo/rpc/protocol/tri/GzipCompressor.java | 74 ++++++++++++++++++++++
.../dubbo/rpc/protocol/tri/IdentityCompressor.java | 41 ++++++++++++
.../dubbo/rpc/protocol/tri/ServerStream.java | 2 +-
.../rpc/protocol/tri/ServerTransportObserver.java | 2 +-
.../protocol/tri/TripleClientRequestHandler.java | 12 ++++
.../dubbo/rpc/protocol/tri/TripleHeaderEnum.java | 2 +
.../tri/TripleHttp2ClientResponseHandler.java | 15 +++++
.../tri/TripleHttp2FrameServerHandler.java | 14 ++++
.../apache/dubbo/rpc/protocol/tri/TripleUtil.java | 20 ++++++
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 2 +-
.../org.apache.dubbo.rpc.protocol.tri.Compressor | 2 +
.../dubbo/rpc/protocol/tri/CompressorTest.java | 53 ++++++++++++++++
.../rpc/protocol/tri/UnaryClientStreamTest.java | 2 +-
20 files changed, 352 insertions(+), 16 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index dbdf11a..8bb27cb 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -55,6 +55,8 @@ public interface Constants {
String STUB_EVENT_METHODS_KEY = "dubbo.stub.event.methods";
+ String COMPRESSOR_KEY = "dubbo.rpc.tri.compressor";
+
String PROXY_KEY = "proxy";
String EXECUTES_KEY = "executes";
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 164e799..822471f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -18,10 +18,12 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.triple.TripleWrapper;
@@ -34,6 +36,8 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
public abstract class AbstractClientStream extends AbstractStream implements Stream {
private ConsumerModel consumerModel;
private Connection connection;
@@ -110,7 +114,7 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
}
out = TripleUtil.pack(obj);
- return out;
+ return super.compress(out);
}
private TripleWrapper.TripleRequestWrapper getRequestWrapper(Object value) {
@@ -168,7 +172,11 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
(String) inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
.putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
(String) inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
- .putIfNotNull(TripleHeaderEnum.SERVICE_GROUP.getHeader(), inv.getInvoker().getUrl().getGroup());
+ .putIfNotNull(TripleHeaderEnum.SERVICE_GROUP.getHeader(), inv.getInvoker().getUrl().getGroup())
+ .putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(),
+ ConfigurationUtils.getGlobalConfiguration(inv.getInvoker().getUrl().getScopeModel()).getString(Constants.COMPRESSOR_KEY, DEFAULT_COMPRESSOR))
+ .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(),
+ TripleUtil.calcAcceptEncoding(inv.getInvoker().getUrl()));
final Map<String, Object> attachments = inv.getObjectAttachments();
if (attachments != null) {
convertAttachment(metadata, attachments);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 92127e7..7293701 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -175,6 +175,16 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
}
}
+ /**
+ * create basic meta data
+ */
+ protected Metadata createRequestMeta() {
+ Metadata metadata = new DefaultMetadata();
+ metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), super.getCompressor().getMessageEncoding())
+ .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), TripleUtil.calcAcceptEncoding(invoker.getUrl()));
+ return metadata;
+ }
+
protected byte[] encodeResponse(Object value) {
final com.google.protobuf.Message message;
if (getMethodDescriptor().isNeedWrap()) {
@@ -183,7 +193,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
} else {
message = (Message) value;
}
- return TripleUtil.pack(message);
+ byte[] out = TripleUtil.pack(message);
+ return super.compress(out);
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index c6383cd..60a6f97 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -58,6 +58,7 @@ public abstract class AbstractStream implements Stream {
private String serializeType;
private StreamObserver<Object> streamSubscriber;
private TransportObserver transportSubscriber;
+ private Compressor compressor = IdentityCompressor.NONE;
private final CancellationContext cancellationContext;
private volatile boolean cancelled = false;
@@ -203,6 +204,15 @@ public abstract class AbstractStream implements Stream {
this.serviceDescriptor = serviceDescriptor;
}
+ protected AbstractStream setCompressor(Compressor compressor) {
+ this.compressor = compressor;
+ return this;
+ }
+
+ public Compressor getCompressor() {
+ return this.compressor;
+ }
+
public URL getUrl() {
return url;
}
@@ -347,6 +357,14 @@ public abstract class AbstractStream implements Stream {
}
}
+ protected byte[] compress(byte[] data) {
+ return this.getCompressor().compress(data);
+ }
+
+ protected byte[] decompress(byte[] data) {
+ return this.getCompressor().decompress(data);
+ }
+
protected abstract class AbstractTransportObserver implements TransportObserver {
private Metadata headers;
private Metadata trailers;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 78bda21..73090fc 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -104,7 +104,7 @@ public class ClientTransportObserver implements TransportObserver {
return;
}
ByteBuf buf = ctx.alloc().buffer();
- buf.writeByte(0);
+ buf.writeByte(TripleUtil.calcCompressFlag(ctx));
buf.writeInt(data.length);
buf.writeBytes(data);
streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
new file mode 100644
index 0000000..e179d7a
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.common.extension.ExtensionScope;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.Constants;
+
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
+/**
+ * compress payload for grpc request, and decompress response payload
+ * Configure it in files, pictures or other configurations that exist in the system properties
+ * Configure {@link Constants#COMPRESSOR_KEY} in dubbo.properties、dubbo.yml or other configuration that exist in the system property
+ */
+@SPI(value = DEFAULT_COMPRESSOR, scope = ExtensionScope.FRAMEWORK)
+public interface Compressor {
+
+ String DEFAULT_COMPRESSOR = "identity";
+
+ /**
+ * message encoding of current compressor
+ * @return return message encoding
+ */
+ String getMessageEncoding();
+
+ /**
+ * compress payload
+ * @param payloadByteArr payload byte array
+ * @return compressed payload byte array
+ */
+ byte[] compress(byte[] payloadByteArr);
+
+ /**
+ * decompress payload
+ * @param payloadByteArr payload byte array
+ * @return decompressed payload byte array
+ */
+ byte[] decompress(byte[] payloadByteArr);
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
index 29d342a..bce6e98 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
@@ -45,14 +45,7 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
.withDescription("gRPC frame header malformed: reserved bits not zero")
.asException();
}
- // compression is not supported yet
- // TODO support it
compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
- if (compressedFlag) {
- throw GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Compression is not supported ")
- .asException();
- }
len = in.readInt();
if (len < 0 || len > maxDataSize) {
@@ -65,7 +58,7 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
case PAYLOAD:
byte[] dst = new byte[len];
in.readBytes(dst);
- out.add(dst);
+ out.add(this.decompressData(dst, ctx));
checkpoint(GrpcDecodeState.HEADER);
break;
default:
@@ -73,6 +66,21 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
}
}
+ private byte[] decompressData(byte[] data, ChannelHandlerContext ctx) {
+ if (!compressedFlag) {
+ return data;
+ }
+
+ Compressor compressor = TripleUtil.getCompressor(ctx);
+ if (null == compressor) {
+ throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
+ .withDescription("gRPC message compressor not found")
+ .asException();
+ }
+
+ return compressor.decompress(data);
+ }
+
enum GrpcDecodeState {
HEADER,
PAYLOAD
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GzipCompressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GzipCompressor.java
new file mode 100644
index 0000000..546b76f
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GzipCompressor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.rpc.RpcException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * gzip compressor
+ */
+public class GzipCompressor implements Compressor {
+
+ @Override
+ public String getMessageEncoding() {
+ return "gzip";
+ }
+
+ @Override
+ public byte[] compress(byte[] payloadByteArr) throws RpcException {
+ if (null == payloadByteArr || 0 == payloadByteArr.length) {
+ return new byte[0];
+ }
+
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteOutStream)) {
+ gzipOutputStream.write(payloadByteArr);
+ } catch (Exception exception) {
+ throw new RpcException(exception);
+ }
+
+ return byteOutStream.toByteArray();
+ }
+
+ @Override
+ public byte[] decompress(byte[] payloadByteArr) throws RpcException {
+ if (null == payloadByteArr || 0 == payloadByteArr.length) {
+ return new byte[0];
+ }
+
+ ByteArrayInputStream byteInStream = new ByteArrayInputStream(payloadByteArr);
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ try (GZIPInputStream gzipInputStream = new GZIPInputStream(byteInStream)) {
+ int readByteNum;
+ byte[] bufferArr = new byte[256];
+ while ((readByteNum = gzipInputStream.read(bufferArr)) >= 0) {
+ byteOutStream.write(bufferArr, 0, readByteNum);
+ }
+ } catch (Exception exception) {
+ throw new RpcException(exception);
+ }
+
+ return byteOutStream.toByteArray();
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
new file mode 100644
index 0000000..a8ac2fc
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/IdentityCompressor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+/**
+ * Default compressor
+ */
+public class IdentityCompressor implements Compressor {
+
+ public static final Compressor NONE = new IdentityCompressor();
+
+ @Override
+ public String getMessageEncoding() {
+ return "identity";
+ }
+
+ @Override
+ public byte[] compress(byte[] payloadByteArr) {
+ return payloadByteArr;
+ }
+
+ @Override
+ public byte[] decompress(byte[] payloadByteArr) {
+ return payloadByteArr;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 666fb2e..ce1c6d7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -45,7 +45,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
@Override
public void onNext(Object data) {
if (!headersSent) {
- getTransportSubscriber().onMetadata(new DefaultMetadata(), false);
+ getTransportSubscriber().onMetadata(createRequestMeta(), false);
headersSent = true;
}
final byte[] bytes = encodeResponse(data);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index 552ce7e..b1326ff 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -82,7 +82,7 @@ public class ServerTransportObserver implements TransportObserver {
return;
}
ByteBuf buf = ctx.alloc().buffer();
- buf.writeByte(0);
+ buf.writeByte(TripleUtil.calcCompressFlag(ctx));
buf.writeInt(data.length);
buf.writeBytes(data);
ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 0149799..7b28c02 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.utils.CollectionUtils;
@@ -40,6 +41,9 @@ import io.netty.channel.ChannelPromise;
import java.util.Arrays;
import java.util.List;
+import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
public class TripleClientRequestHandler extends ChannelDuplexHandler {
private final FrameworkModel frameworkModel;
@@ -72,6 +76,14 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
if (StringUtils.isNotEmpty(ssl)) {
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
}
+
+ // get compressor
+ String compressorStr = ConfigurationUtils
+ .getGlobalConfiguration(url.getScopeModel()).getString(COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
+ Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ stream.setCompressor(compressor);
+ ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+
stream.service(consumerModel)
.connection(Connection.getConnectionFromChannel(ctx.channel()))
.method(methodDescriptor)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 18a3219..0d94ac7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -35,6 +35,8 @@ public enum TripleHeaderEnum {
CONTENT_TYPE_KEY("content-type"),
CONTENT_PROTO("application/grpc+proto"),
APPLICATION_GRPC("application/grpc"),
+ GRPC_ENCODING("grpc-encoding"),
+ GRPC_ACCEPT_ENCODING("grpc-accept-encoding"),
TRICE_ID_KEY("tri-trace-traceid"),
RPC_ID_KEY("tri-trace-rpcid"),
CONSUMER_APP_NAME_KEY("tri-consumer-appname"),
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index f1a2f74..b41d4e7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -69,6 +69,21 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
private void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) {
Http2Headers headers = msg.headers();
AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+
+ CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
+ if (null != messageEncoding) {
+ String compressorStr = messageEncoding.toString();
+ Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
+ .getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ if (null == compressor) {
+ throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
+ .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
+ .asException();
+ } else {
+ clientStream.setCompressor(compressor);
+ ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ }
+ }
final TransportObserver observer = clientStream.asTransportObserver();
observer.onMetadata(new Http2HeaderMeta(headers), false);
if (msg.isEndStream()) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 2c86a95..64443a5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -223,6 +223,20 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
// Then you need to find the corresponding parameter according to the request body
stream.methods(methodDescriptors);
}
+ CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
+ if (null != messageEncoding) {
+ String compressorStr = messageEncoding.toString();
+ Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
+ getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ if (null == compressor) {
+ TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+ .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
+ } else {
+ stream.setCompressor(compressor);
+ ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ }
+ }
final TransportObserver observer = stream.asTransportObserver();
observer.onMetadata(new Http2HeaderMeta(headers), false);
if (msg.isEndStream()) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 72bb526..ad7d809 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -60,6 +60,8 @@ public class TripleUtil {
"tri_server_stream");
public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
"tri_client_stream");
+ public static final AttributeKey<Compressor> COMPRESSOR_KEY = AttributeKey.newInstance(
+ "tri_compressor");
// Some exceptions are not very useful and add too much noise to the log
private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
@@ -95,6 +97,15 @@ public class TripleUtil {
return ctx.channel().attr(TripleUtil.CLIENT_STREAM_KEY).get();
}
+ public static Compressor getCompressor(ChannelHandlerContext ctx) {
+ return ctx.channel().attr(COMPRESSOR_KEY).get();
+ }
+
+ public static int calcCompressFlag(ChannelHandlerContext ctx) {
+ Compressor compressor = getCompressor(ctx);
+ return null == compressor ? 0 : 1;
+ }
+
/**
* must starts from application/grpc
*/
@@ -349,4 +360,13 @@ public class TripleUtil {
return serializeType;
}
+ public static String calcAcceptEncoding(URL url) {
+ Set<String> supportedEncodingSet = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getSupportedExtensions();
+ if (supportedEncodingSet.isEmpty()) {
+ return null;
+ }
+
+ return String.join(",", supportedEncodingSet);
+ }
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 2140463..b5ec80e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -113,7 +113,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
}
return;
}
- Metadata metadata = new DefaultMetadata();
+ Metadata metadata = createRequestMeta();
metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
getTransportSubscriber().onMetadata(metadata, false);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.Compressor b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.Compressor
new file mode 100644
index 0000000..1a485e2
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.Compressor
@@ -0,0 +1,2 @@
+identity=org.apache.dubbo.rpc.protocol.tri.IdentityCompressor
+gzip=org.apache.dubbo.rpc.protocol.tri.GzipCompressor
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/CompressorTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/CompressorTest.java
new file mode 100644
index 0000000..5e9bdac
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/CompressorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class CompressorTest {
+
+ private static final String TEST_STR;
+
+ static {
+ StringBuilder builder = new StringBuilder();
+ int charNum = 1000000;
+ for (int i = 0; i < charNum; i++) {
+ builder.append("a");
+ }
+
+ TEST_STR = builder.toString();
+ }
+
+ @ValueSource(strings = {"gzip", "identity"})
+ @ParameterizedTest
+ void compression(String compressorName) {
+ System.out.println("current compressor is " + compressorName);
+ Compressor compressor = ExtensionLoader.getExtensionLoader(Compressor.class).getExtension(compressorName);
+
+ byte[] compressedByteArr = compressor.compress(TEST_STR.getBytes());
+ System.out.println("compressed byte length:" + compressedByteArr.length);
+
+ byte[] decompressedByteArr = compressor.decompress(compressedByteArr);
+ System.out.println("decompressed byte length:" + decompressedByteArr.length);
+ Assertions.assertEquals(new String(decompressedByteArr), TEST_STR);
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 611bb98..47bc48d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -49,7 +49,7 @@ class UnaryClientStreamTest {
when(inv.getInvoker()).thenReturn(mockInvoker);
// no subscriber
Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
- verify(mockInvoker, times(2)).getUrl();
+ verify(mockInvoker, times(4)).getUrl();
TransportObserver transportObserver = Mockito.mock(TransportObserver.class);
stream.subscribe(transportObserver);