You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/18 02:38:50 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] Support streamObserver set
compressor alone (#9032)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d548c9f [3.0-Triple] Support streamObserver set compressor alone (#9032)
d548c9f is described below
commit d548c9feb28e9ba4d0cc40c70709dc24e952794b
Author: earthchen <yo...@duobei.com>
AuthorDate: Mon Oct 18 10:38:31 2021 +0800
[3.0-Triple] Support streamObserver set compressor alone (#9032)
* support streamObserver set compressor
* fix rat
* fix comment
* remove CancelableStreamObserver getCancelContext
* add final for CancelableStreamObserver setCancelContext
* remove compressor from channel
* server set compress alone
* Separate Compressor and decompressor
* change client channel stream
* fix review
* fix error
* remove util
* fix error
* fix style
---
.../rpc/protocol/tri/AbstractClientStream.java | 32 ++-------
.../rpc/protocol/tri/AbstractServerStream.java | 2 +-
.../dubbo/rpc/protocol/tri/AbstractStream.java | 66 ++++++++++++++-----
.../rpc/protocol/tri/CancelableStreamObserver.java | 25 +++++--
.../dubbo/rpc/protocol/tri/ClientStream.java | 56 +++++++++++-----
...portObserver.java => ClientStreamObserver.java} | 20 +++---
.../rpc/protocol/tri/ClientTransportObserver.java | 74 +++++++++++----------
.../apache/dubbo/rpc/protocol/tri/Compressor.java | 22 +++++++
.../dubbo/rpc/protocol/tri/GrpcDataDecoder.java | 46 ++++++++++---
.../dubbo/rpc/protocol/tri/ServerStream.java | 26 ++++++--
...portObserver.java => ServerStreamObserver.java} | 20 +++---
.../rpc/protocol/tri/ServerTransportObserver.java | 38 ++++++-----
.../dubbo/rpc/protocol/tri/TransportObserver.java | 8 +++
.../protocol/tri/TripleClientInboundHandler.java | 4 +-
.../protocol/tri/TripleClientRequestHandler.java | 8 +--
.../dubbo/rpc/protocol/tri/TripleConstant.java | 9 ++-
.../tri/TripleHttp2ClientResponseHandler.java | 19 +++---
.../tri/TripleHttp2FrameServerHandler.java | 15 ++---
.../protocol/tri/TripleServerInboundHandler.java | 2 +-
.../rpc/protocol/tri/TripleServerInitializer.java | 4 +-
.../apache/dubbo/rpc/protocol/tri/TripleUtil.java | 76 ----------------------
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 25 ++++++-
22 files changed, 338 insertions(+), 259 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 4c6d4cb..b794edb 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -18,12 +18,10 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.triple.TripleWrapper;
@@ -36,7 +34,6 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
-import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
public abstract class AbstractClientStream extends AbstractStream implements Stream {
private ConsumerModel consumerModel;
@@ -63,6 +60,10 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
final CancellationContext cancellationContext = stream.getCancellationContext();
// for client cancel,send rst frame to server
cancellationContext.addListener(context -> {
+ if (LOGGER.isWarnEnabled()) {
+ Throwable throwable = cancellationContext.getCancellationCause();
+ LOGGER.warn("Cancel by local throwable is ", throwable);
+ }
stream.asTransportObserver().onReset(Http2Error.CANCEL);
});
return stream;
@@ -173,10 +174,8 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
.putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
(String) inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
.putIfNotNull(TripleHeaderEnum.SERVICE_GROUP.getHeader(), getUrl().getGroup())
- .putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(),
- ConfigurationUtils.getCachedDynamicProperty(inv.getModuleModel(), Constants.COMPRESSOR_KEY, DEFAULT_COMPRESSOR))
- .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(),
- TripleUtil.calcAcceptEncoding(getUrl()));
+ .putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), getCompressor().getMessageEncoding())
+ .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
final Map<String, Object> attachments = inv.getObjectAttachments();
if (attachments != null) {
convertAttachment(metadata, attachments);
@@ -194,24 +193,5 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
getCancellationContext().cancel(throwable);
}
- protected class ClientStreamObserver extends CancelableStreamObserver<Object> {
-
- @Override
- public void onNext(Object data) {
- RpcInvocation invocation = (RpcInvocation) data;
- final Metadata metadata = createRequestMeta(invocation);
- getTransportSubscriber().onMetadata(metadata, false);
- final byte[] bytes = encodeRequest(invocation);
- getTransportSubscriber().onData(bytes, false);
- }
- @Override
- public void onError(Throwable throwable) {
- }
-
- @Override
- public void onCompleted() {
- getTransportSubscriber().onComplete();
- }
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 7293701..153c556 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -181,7 +181,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
protected Metadata createRequestMeta() {
Metadata metadata = new DefaultMetadata();
metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), super.getCompressor().getMessageEncoding())
- .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), TripleUtil.calcAcceptEncoding(invoker.getUrl()));
+ .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
return metadata;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 60a6f97..c6bdde6 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -59,6 +59,7 @@ public abstract class AbstractStream implements Stream {
private StreamObserver<Object> streamSubscriber;
private TransportObserver transportSubscriber;
private Compressor compressor = IdentityCompressor.NONE;
+ private Compressor deCompressor = IdentityCompressor.NONE;
private final CancellationContext cancellationContext;
private volatile boolean cancelled = false;
@@ -81,7 +82,7 @@ public abstract class AbstractStream implements Stream {
this.executor = wrapperSerializingExecutor(sourceExecutor);
final String value = url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY);
this.multipleSerialization = url.getOrDefaultFrameworkModel().getExtensionLoader(MultipleSerialization.class)
- .getExtension(value);
+ .getExtension(value);
this.cancellationContext = new CancellationContext();
this.transportObserver = createTransportObserver();
this.streamObserver = createStreamObserver();
@@ -94,8 +95,8 @@ public abstract class AbstractStream implements Stream {
return executor;
}
ExecutorRepository executorRepository = url.getOrDefaultApplicationModel()
- .getExtensionLoader(ExecutorRepository.class)
- .getDefaultExtension();
+ .getExtensionLoader(ExecutorRepository.class)
+ .getDefaultExtension();
Executor urlExecutor = executorRepository.getExecutor(url);
if (urlExecutor == null) {
urlExecutor = executorRepository.createExecutorIfAbsent(url);
@@ -204,8 +205,39 @@ public abstract class AbstractStream implements Stream {
this.serviceDescriptor = serviceDescriptor;
}
+ /**
+ * set compressor if required
+ *
+ * @param compressor {@link Compressor}
+ */
protected AbstractStream setCompressor(Compressor compressor) {
- this.compressor = compressor;
+ // If compressor is NULL, this will not be set.
+ // Consider whether to throw an exception or handle silently,
+ // But now choose silent processing, Fall back to default.
+ if (compressor != null) {
+ this.compressor = compressor;
+ } else {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("Compressor is Null, Fall back to default compression." +
+ " MessageEncoding is " + getCompressor().getMessageEncoding());
+ }
+ }
+ return this;
+ }
+
+
+ protected AbstractStream setDeCompressor(Compressor compressor) {
+ // If compressor is NULL, this will not be set.
+ // Consider whether to throw an exception or handle silently,
+ // But now choose silent processing, Fall back to default.
+ if (compressor != null) {
+ this.deCompressor = compressor;
+ } else {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("Compressor is Null, Fall back to default deCompression." +
+ " MessageEncoding is " + getDeCompressor().getMessageEncoding());
+ }
+ }
return this;
}
@@ -213,6 +245,10 @@ public abstract class AbstractStream implements Stream {
return this.compressor;
}
+ public Compressor getDeCompressor() {
+ return this.deCompressor;
+ }
+
public URL getUrl() {
return url;
}
@@ -252,7 +288,7 @@ public abstract class AbstractStream implements Stream {
getTransportSubscriber().onMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Server-Error] status=" + status.code.code + " service=" + getServiceDescriptor().getServiceName()
- + " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
+ + " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
}
}
@@ -284,24 +320,24 @@ public abstract class AbstractStream implements Stream {
metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), getGrpcMessage(grpcStatus));
metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), String.valueOf(grpcStatus.code.code));
Status.Builder builder = Status.newBuilder()
- .setCode(grpcStatus.code.code)
- .setMessage(getGrpcMessage(grpcStatus));
+ .setCode(grpcStatus.code.code)
+ .setMessage(getGrpcMessage(grpcStatus));
Throwable throwable = grpcStatus.cause;
if (throwable == null) {
Status status = builder.build();
metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
- TripleUtil.encodeBase64ASCII(status.toByteArray()));
+ TripleUtil.encodeBase64ASCII(status.toByteArray()));
return metadata;
}
DebugInfo debugInfo = DebugInfo.newBuilder()
- .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 10))
- // can not use now
- // .setDetail(throwable.getMessage())
- .build();
+ .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 10))
+ // can not use now
+ // .setDetail(throwable.getMessage())
+ .build();
builder.addDetails(Any.pack(debugInfo));
Status status = builder.build();
metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
- TripleUtil.encodeBase64ASCII(status.toByteArray()));
+ TripleUtil.encodeBase64ASCII(status.toByteArray()));
return metadata;
}
@@ -362,7 +398,7 @@ public abstract class AbstractStream implements Stream {
}
protected byte[] decompress(byte[] data) {
- return this.getCompressor().decompress(data);
+ return this.getDeCompressor().decompress(data);
}
protected abstract class AbstractTransportObserver implements TransportObserver {
@@ -438,7 +474,7 @@ public abstract class AbstractStream implements Stream {
this.data = in;
} else {
onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription(DUPLICATED_DATA));
+ .withDescription(DUPLICATED_DATA));
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
index 09eee4e..379b002 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
@@ -17,19 +17,36 @@
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.CancellationContext;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public abstract class CancelableStreamObserver<T> implements StreamObserver<T> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CancelableStreamObserver.class);
+
private CancellationContext cancellationContext;
- public CancellationContext getCancellationContext() {
- return cancellationContext;
+ private final AtomicBoolean contextSet = new AtomicBoolean(false);
+
+ public CancelableStreamObserver() {
}
- public void setCancellationContext(CancellationContext cancellationContext) {
- this.cancellationContext = cancellationContext;
+ public CancelableStreamObserver(CancellationContext cancellationContext) {
+ setCancellationContext(cancellationContext);
+ }
+
+ public final void setCancellationContext(CancellationContext cancellationContext) {
+ if (contextSet.compareAndSet(false, true)) {
+ this.cancellationContext = cancellationContext;
+ } else {
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn("CancellationContext already set,do not repeat the set, ignore this set");
+ }
+ }
}
public final void cancel(Throwable throwable) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index f2502d3..215eaba 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.RpcInvocation;
public class ClientStream extends AbstractClientStream implements Stream {
@@ -29,27 +30,48 @@ public class ClientStream extends AbstractClientStream implements Stream {
@Override
protected StreamObserver<Object> createStreamObserver() {
- ClientStreamObserver clientStreamObserver = new ClientStreamObserver() {
- boolean metaSent;
+ return new ClientStreamObserverImpl(getCancellationContext());
+ }
- @Override
- public void onNext(Object data) {
- if (!metaSent) {
- metaSent = true;
- final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
- getTransportSubscriber().onMetadata(metadata, false);
- }
- final byte[] bytes = encodeRequest(data);
- getTransportSubscriber().onData(bytes, false);
+ private class ClientStreamObserverImpl extends CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+
+ private boolean metaSent;
+
+ public ClientStreamObserverImpl(CancellationContext cancellationContext) {
+ super(cancellationContext);
+ this.metaSent = false;
+ }
+
+ @Override
+ public void onNext(Object data) {
+ if (!metaSent) {
+ metaSent = true;
+ final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
+ getTransportSubscriber().onMetadata(metadata, false);
}
+ final byte[] bytes = encodeRequest(data);
+ getTransportSubscriber().onData(bytes, false);
+ }
- @Override
- public void onError(Throwable throwable) {
- transportError(throwable);
+ @Override
+ public void onError(Throwable throwable) {
+ transportError(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ getTransportSubscriber().onComplete();
+ }
+
+ @Override
+ public void setCompression(String compression) {
+ if (metaSent) {
+ cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
+ return;
}
- };
- clientStreamObserver.setCancellationContext(getCancellationContext());
- return clientStreamObserver;
+ Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+ setCompressor(compressor);
+ }
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
similarity index 66%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
index 77e283e..145551a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
@@ -17,18 +17,18 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http2.Http2Error;
+import org.apache.dubbo.common.stream.StreamObserver;
-public interface TransportObserver {
+public interface ClientStreamObserver<T> extends StreamObserver<T> {
- void onMetadata(Metadata metadata, boolean endStream);
+ /**
+ * Sets the compression algorithm to use for the call
+ * <p>
+ * For stream set compression needs to determine whether the metadata has been sent, and carry on corresponding processing
+ *
+ * @param compression {@link Compressor}
+ */
+ void setCompression(String compression);
- void onData(byte[] data, boolean endStream);
-
- default void onReset(Http2Error http2Error) {
- }
-
- default void onComplete() {
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 73090fc..3715e98 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -56,9 +56,9 @@ public class ClientTransportObserver implements TransportObserver {
final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
streamChannel.pipeline().addLast(responseHandler)
- .addLast(new GrpcDataDecoder(Integer.MAX_VALUE))
- .addLast(new TripleClientInboundHandler());
- streamChannel.attr(TripleUtil.CLIENT_STREAM_KEY).set(stream);
+ .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
+ .addLast(new TripleClientInboundHandler());
+ streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
}
@Override
@@ -70,18 +70,18 @@ public class ClientTransportObserver implements TransportObserver {
return;
}
final Http2Headers headers = new DefaultHttp2Headers(true)
- .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
- .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
- .scheme(SCHEME)
- .method(HttpMethod.POST.asciiName());
+ .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
+ .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
+ .scheme(SCHEME)
+ .method(HttpMethod.POST.asciiName());
metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
headerSent = true;
streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
- .addListener(future -> {
- if (!future.isSuccess()) {
- promise.tryFailure(future.cause());
- }
- });
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ promise.tryFailure(future.cause());
+ }
+ });
}
@@ -89,13 +89,13 @@ public class ClientTransportObserver implements TransportObserver {
public void onReset(Http2Error http2Error) {
resetSent = true;
streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
- .addListener(future -> {
- if (future.isSuccess()) {
- promise.trySuccess();
- } else {
- promise.tryFailure(future.cause());
- }
- });
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ promise.trySuccess();
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ });
}
@Override
@@ -104,17 +104,17 @@ public class ClientTransportObserver implements TransportObserver {
return;
}
ByteBuf buf = ctx.alloc().buffer();
- buf.writeByte(TripleUtil.calcCompressFlag(ctx));
+ buf.writeByte(getCompressFlag());
buf.writeInt(data.length);
buf.writeBytes(data);
streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
- .addListener(future -> {
- if (future.isSuccess()) {
- promise.trySuccess();
- } else {
- promise.tryFailure(future.cause());
- }
- });
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ promise.trySuccess();
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ });
}
@Override
@@ -127,12 +127,18 @@ public class ClientTransportObserver implements TransportObserver {
}
endStreamSent = true;
streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
- .addListener(future -> {
- if (future.isSuccess()) {
- promise.trySuccess();
- } else {
- promise.tryFailure(future.cause());
- }
- });
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ promise.trySuccess();
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ });
}
+
+ private int getCompressFlag() {
+ AbstractClientStream stream = streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).get();
+ return TransportObserver.calcCompressFlag(stream.getCompressor());
+ }
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
index e179d7a..776fb89 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -20,6 +20,9 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.extension.ExtensionScope;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+import java.util.Set;
import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
@@ -35,12 +38,14 @@ public interface Compressor {
/**
* message encoding of current compressor
+ *
* @return return message encoding
*/
String getMessageEncoding();
/**
* compress payload
+ *
* @param payloadByteArr payload byte array
* @return compressed payload byte array
*/
@@ -48,9 +53,26 @@ public interface Compressor {
/**
* decompress payload
+ *
* @param payloadByteArr payload byte array
* @return decompressed payload byte array
*/
byte[] decompress(byte[] payloadByteArr);
+
+ static Compressor getCompressor(FrameworkModel frameworkModel, String compressorStr) {
+ if (null == compressorStr) {
+ return null;
+ }
+ return frameworkModel.getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ }
+
+
+ static String getAcceptEncoding(FrameworkModel frameworkModel) {
+ Set<String> supportedEncodingSet = frameworkModel.getExtensionLoader(Compressor.class).getSupportedExtensions();
+ if (supportedEncodingSet.isEmpty()) {
+ return null;
+ }
+ return String.join(",", supportedEncodingSet);
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
index bce6e98..59122fe 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
@@ -16,6 +16,9 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
@@ -23,16 +26,27 @@ import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecodeState> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GrpcDataDecoder.class);
private static final int RESERVED_MASK = 0xFE;
private static final int COMPRESSED_FLAG_MASK = 1;
private final int maxDataSize;
private int len;
private boolean compressedFlag;
+ private final boolean client;
- public GrpcDataDecoder(int maxDataSize) {
+ public GrpcDataDecoder(int maxDataSize, boolean client) {
super(GrpcDecodeState.HEADER);
this.maxDataSize = maxDataSize;
+ this.client = client;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("Grpc data read error ", cause);
+ }
+ ctx.close();
}
@Override
@@ -42,17 +56,17 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
int type = in.readByte();
if ((type & RESERVED_MASK) != 0) {
throw GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("gRPC frame header malformed: reserved bits not zero")
- .asException();
+ .withDescription("gRPC frame header malformed: reserved bits not zero")
+ .asException();
}
compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
len = in.readInt();
if (len < 0 || len > maxDataSize) {
throw GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
- .withDescription(String.format("gRPC message exceeds maximum size %d: %d",
- maxDataSize, len))
- .asException();
+ .withDescription(String.format("gRPC message exceeds maximum size %d: %d",
+ maxDataSize, len))
+ .asException();
}
checkpoint(GrpcDecodeState.PAYLOAD);
case PAYLOAD:
@@ -70,14 +84,12 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
if (!compressedFlag) {
return data;
}
-
- Compressor compressor = TripleUtil.getCompressor(ctx);
+ Compressor compressor = getDeCompressor(ctx, client);
if (null == compressor) {
throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
.withDescription("gRPC message compressor not found")
.asException();
}
-
return compressor.decompress(data);
}
@@ -85,4 +97,20 @@ public class GrpcDataDecoder extends ReplayingDecoder<GrpcDataDecoder.GrpcDecode
HEADER,
PAYLOAD
}
+
+
+ private Compressor getDeCompressor(ChannelHandlerContext ctx, boolean client) {
+ AbstractStream stream = client ? getClientStream(ctx) : getServerStream(ctx);
+ return stream.getDeCompressor();
+ }
+
+ private AbstractClientStream getClientStream(ChannelHandlerContext ctx) {
+ return ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
+ }
+
+ private AbstractServerStream getServerStream(ChannelHandlerContext ctx) {
+ return ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+ }
+
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index ce1c6d7..7958b61 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -31,7 +31,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
@Override
protected StreamObserver<Object> createStreamObserver() {
- return new ServerStreamObserver();
+ return new ServerStreamObserverImpl();
}
@Override
@@ -39,7 +39,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
return new StreamTransportObserver();
}
- private class ServerStreamObserver implements StreamObserver<Object> {
+ private class ServerStreamObserverImpl implements ServerStreamObserver<Object> {
private boolean headersSent;
@Override
@@ -55,8 +55,8 @@ public class ServerStream extends AbstractServerStream implements Stream {
@Override
public void onError(Throwable throwable) {
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(throwable)
- .withDescription("Biz exception");
+ .withCause(throwable)
+ .withDescription("Biz exception");
transportError(status);
}
@@ -67,6 +67,18 @@ public class ServerStream extends AbstractServerStream implements Stream {
metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
getTransportSubscriber().onMetadata(metadata, true);
}
+
+ @Override
+ public void setCompression(String compression) {
+ if (headersSent) {
+ final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription("Metadata already has been sent,can not set compression");
+ transportError(status);
+ return;
+ }
+ Compressor compressor = Compressor.getCompressor(getUrl().getOrDefaultFrameworkModel(), compression);
+ setCompressor(compressor);
+ }
}
private class StreamTransportObserver extends AbstractTransportObserver implements TransportObserver {
@@ -102,7 +114,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
subscribe((StreamObserver<Object>) result.getValue());
} catch (Throwable t) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Failed to create server's observer"));
+ .withDescription("Failed to create server's observer"));
}
} finally {
RpcContext.removeCancellationContext();
@@ -122,8 +134,8 @@ public class ServerStream extends AbstractServerStream implements Stream {
biStreamOnData(in);
} catch (Throwable t) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Deserialize request failed")
- .withCause(t));
+ .withDescription("Deserialize request failed")
+ .withCause(t));
}
});
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
similarity index 66%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
index 77e283e..9aaba38 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
@@ -17,18 +17,18 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http2.Http2Error;
+import org.apache.dubbo.common.stream.StreamObserver;
-public interface TransportObserver {
+public interface ServerStreamObserver<T> extends StreamObserver<T> {
- void onMetadata(Metadata metadata, boolean endStream);
+ /**
+ * Sets the compression algorithm to use for the call
+ * <p>
+ * For stream set compression needs to determine whether the metadata has been sent, and carry on corresponding processing
+ *
+ * @param compression {@link Compressor}
+ */
+ void setCompression(String compression);
- void onData(byte[] data, boolean endStream);
-
- default void onReset(Http2Error http2Error) {
- }
-
- default void onComplete() {
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index b1326ff..b25fef9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -58,22 +58,22 @@ public class ServerTransportObserver implements TransportObserver {
}
// If endStream is true, the channel will be closed, so you cannot listen for errors and continue sending any frame
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
- .addListener(future -> {
- if (!future.isSuccess()) {
- LOGGER.warn("send header error endStream=" + endStream, future.cause());
- }
- });
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOGGER.warn("send header error endStream=" + endStream, future.cause());
+ }
+ });
}
@Override
public void onReset(Http2Error http2Error) {
resetSent = true;
ctx.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
- .addListener(future -> {
- if (!future.isSuccess()) {
- LOGGER.warn("write reset error", future.cause());
- }
- });
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOGGER.warn("write reset error", future.cause());
+ }
+ });
}
@Override
@@ -82,14 +82,20 @@ public class ServerTransportObserver implements TransportObserver {
return;
}
ByteBuf buf = ctx.alloc().buffer();
- buf.writeByte(TripleUtil.calcCompressFlag(ctx));
+ buf.writeByte(getCompressFlag());
buf.writeInt(data.length);
buf.writeBytes(data);
ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
- .addListener(future -> {
- if (!future.isSuccess()) {
- LOGGER.warn("send data error endStream=" + endStream, future.cause());
- }
- });
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOGGER.warn("send data error endStream=" + endStream, future.cause());
+ }
+ });
+ }
+
+
+ private int getCompressFlag() {
+ AbstractServerStream stream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
+ return TransportObserver.calcCompressFlag(stream.getCompressor());
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 77e283e..01415a5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -31,4 +31,12 @@ public interface TransportObserver {
default void onComplete() {
}
+
+ static int calcCompressFlag(Compressor compressor) {
+ if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+ return 0;
+ }
+ return 1;
+ }
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index 73d25f9..377b093 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -22,12 +22,12 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+ final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
final byte[] data = (byte[]) msg;
if (clientStream != null) {
clientStream.asTransportObserver()
- .onData(data, false);
+ .onData(data, false);
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 88d8ea0..83928d2 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -76,15 +76,13 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
if (StringUtils.isNotEmpty(ssl)) {
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
}
-
// Compressor can not be set by dynamic config
String compressorStr = ConfigurationUtils
- .getCachedDynamicProperty(inv.getModuleModel(),COMPRESSOR_KEY,DEFAULT_COMPRESSOR);
+ .getCachedDynamicProperty(inv.getModuleModel(), COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
- if (null != compressorStr && !compressorStr.equals(DEFAULT_COMPRESSOR)) {
- Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ Compressor compressor = Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
+ if (compressor != null) {
stream.setCompressor(compressor);
- ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
}
stream.service(consumerModel)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 586efab..88c534f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.constants.CommonConstants;
-import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
@@ -29,8 +28,6 @@ public interface TripleConstant {
String SERIALIZATION_KEY = "serialization";
String TE_KEY = "te";
- // each header size
- long DEFAULT_HEADER_LIST_SIZE = Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE;
AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
@@ -38,4 +35,10 @@ public interface TripleConstant {
AsciiString HTTPS_SCHEME = AsciiString.of("https");
AsciiString HTTP_SCHEME = AsciiString.of("http");
+
+ AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.newInstance(
+ "tri_server_stream");
+ AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
+ "tri_client_stream");
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index 930fc0a..0a35d2b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -63,28 +63,25 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
}
private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame) {
- AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+ final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
clientStream.cancelByRemote(Http2Error.valueOf(resetFrame.errorCode()));
ctx.close();
}
private void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) {
Http2Headers headers = msg.headers();
- AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
-
+ final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
if (null != messageEncoding) {
String compressorStr = messageEncoding.toString();
- if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
- Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
- .getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
+ Compressor compressor = Compressor.getCompressor(clientStream.getUrl().getOrDefaultFrameworkModel(), compressorStr);
if (null == compressor) {
throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
.withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
.asException();
} else {
- clientStream.setCompressor(compressor);
- ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ clientStream.setDeCompressor(compressor);
}
}
}
@@ -96,8 +93,8 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withCause(cause);
Metadata metadata = new DefaultMetadata();
@@ -111,7 +108,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception {
super.channelRead(ctx, msg.content());
if (msg.isEndStream()) {
- final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+ final AbstractClientStream clientStream = ctx.channel().attr(TripleConstant.CLIENT_STREAM_KEY).get();
// stream already closed;
if (clientStream != null) {
clientStream.asTransportObserver().onComplete();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index eb9505d..1fea532 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -84,7 +84,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
Http2Error http2Error = Http2Error.valueOf(frame.errorCode());
- final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
+ final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
serverStream.cancelByRemote(http2Error);
ctx.close();
}
@@ -106,7 +106,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
super.channelRead(ctx, msg.content());
if (msg.isEndStream()) {
- final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
+ final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
if (serverStream != null) {
serverStream.asTransportObserver().onComplete();
}
@@ -228,16 +228,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
if (null != messageEncoding) {
String compressorStr = messageEncoding.toString();
- if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
- Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
- getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ if (!DEFAULT_COMPRESSOR.equals(compressorStr)) {
+ Compressor compressor = Compressor.getCompressor(frameworkModel, compressorStr);
if (null == compressor) {
TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
.withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
} else {
- stream.setCompressor(compressor);
- ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ stream.setDeCompressor(compressor);
}
}
}
@@ -246,8 +244,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
if (msg.isEndStream()) {
observer.onComplete();
}
-
- channel.attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
+ channel.attr(TripleConstant.SERVER_STREAM_KEY).set(stream);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 8c37699..2d16e8e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -22,7 +22,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
+ final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
final byte[] data = (byte[]) msg;
if (serverStream != null) {
serverStream.asTransportObserver()
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
index 0d3b7d1..7120aef 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
@@ -24,7 +24,7 @@ import io.netty.channel.ChannelPipeline;
public class TripleServerInitializer extends ChannelInitializer<Channel> {
- private FrameworkModel frameworkModel;
+ private final FrameworkModel frameworkModel;
public TripleServerInitializer(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
@@ -35,7 +35,7 @@ public class TripleServerInitializer extends ChannelInitializer<Channel> {
final ChannelPipeline p = ch.pipeline();
p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
// TODO constraint MAX DATA_SIZE
- p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE));
+ p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
p.addLast(new TripleServerInboundHandler());
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 1467c8e..c352b7a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -36,7 +36,6 @@ import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.util.AttributeKey;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -55,14 +54,6 @@ import java.util.Set;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
public class TripleUtil {
-
- public static final AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.newInstance(
- "tri_server_stream");
- public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.newInstance(
- "tri_client_stream");
- public static final AttributeKey<Compressor> COMPRESSOR_KEY = AttributeKey.newInstance(
- "tri_compressor");
- public static final String LANGUAGE = "java";
// Some exceptions are not very useful and add too much noise to the log
private static final Set<String> QUIET_EXCEPTIONS = new HashSet<>();
private static final Set<Class<?>> QUIET_EXCEPTIONS_CLASS = new HashSet<>();
@@ -85,26 +76,6 @@ public class TripleUtil {
return false;
}
- public static AbstractServerStream getServerStream(ChannelHandlerContext ctx) {
- return ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).get();
- }
-
- public static AbstractClientStream getClientStream(ChannelHandlerContext ctx) {
- return ctx.channel().attr(TripleUtil.CLIENT_STREAM_KEY).get();
- }
-
- public static Compressor getCompressor(ChannelHandlerContext ctx) {
- return ctx.channel().attr(COMPRESSOR_KEY).get();
- }
-
- public static int calcCompressFlag(ChannelHandlerContext ctx) {
- Compressor compressor = getCompressor(ctx);
- if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
- return 0;
- }
- return 1;
- }
-
/**
* must starts from application/grpc
*/
@@ -201,44 +172,6 @@ public class TripleUtil {
}
}
- public static TripleWrapper.TripleExceptionWrapper wrapException(URL url, Throwable throwable,
- String serializeType,
- MultipleSerialization serialization) {
- try {
- final TripleWrapper.TripleExceptionWrapper.Builder builder = TripleWrapper.TripleExceptionWrapper.newBuilder()
- .setLanguage(LANGUAGE)
- .setClassName(throwable.getClass().getName())
- .setSerialization(serializeType);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- serialization.serialize(url, serializeType, builder.getClassName(), throwable, bos);
- builder.setData(ByteString.copyFrom(bos.toByteArray()));
- bos.close();
- return builder.build();
- } catch (IOException e) {
- throw new RuntimeException("Failed to pack wrapper exception", e);
- }
- }
-
- public static Throwable unWrapException(URL url, TripleWrapper.TripleExceptionWrapper wrap,
- String serializeType,
- MultipleSerialization serialization) {
- if (wrap == null) {
- return null;
- }
- if (!LANGUAGE.equals(wrap.getLanguage())) {
- return null;
- }
- try {
- final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
- Object obj = serialization.deserialize(url, serializeType, wrap.getClassName(), bais);
- bais.close();
- return (Throwable) obj;
- } catch (Exception e) {
- // if this null ,can get common exception
- return null;
- }
- }
-
public static TripleWrapper.TripleRequestWrapper wrapReq(URL url, String serializeType, Object req,
String type,
@@ -359,13 +292,4 @@ public class TripleUtil {
return serializeType;
}
- public static String calcAcceptEncoding(URL url) {
- Set<String> supportedEncodingSet = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getSupportedExtensions();
- if (supportedEncodingSet.isEmpty()) {
- return null;
- }
-
- return String.join(",", supportedEncodingSet);
- }
-
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 2f431b8..452b514 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
@@ -40,7 +41,7 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
@Override
protected StreamObserver<Object> createStreamObserver() {
- return new ClientStreamObserver();
+ return new UnaryClientStreamObserverImpl();
}
@Override
@@ -122,4 +123,26 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
}
}
}
+
+
+ private class UnaryClientStreamObserverImpl implements StreamObserver<Object> {
+
+ @Override
+ public void onNext(Object data) {
+ RpcInvocation invocation = (RpcInvocation) data;
+ final Metadata metadata = createRequestMeta(invocation);
+ getTransportSubscriber().onMetadata(metadata, false);
+ final byte[] bytes = encodeRequest(invocation);
+ getTransportSubscriber().onData(bytes, false);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ getTransportSubscriber().onComplete();
+ }
+ }
}