You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/08/30 10:51:22 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] Remove operation handler
(#8637)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new dbfad05 [3.0-Triple] Remove operation handler (#8637)
dbfad05 is described below
commit dbfad0504bc6653aaf6827133b337b494dfd6167
Author: GuoHao <gu...@gmail.com>
AuthorDate: Mon Aug 30 18:50:44 2021 +0800
[3.0-Triple] Remove operation handler (#8637)
* Add more comments
* Remove unused operation handler
* Fix UT
---
.../rpc/protocol/tri/AbstractClientStream.java | 6 +--
.../dubbo/rpc/protocol/tri/AbstractStream.java | 54 +++++++++++-----------
.../dubbo/rpc/protocol/tri/ClientStream.java | 8 ++--
.../rpc/protocol/tri/ClientTransportObserver.java | 6 +--
.../dubbo/rpc/protocol/tri/ServerStream.java | 14 +++---
.../rpc/protocol/tri/ServerTransportObserver.java | 8 +---
.../org/apache/dubbo/rpc/protocol/tri/Stream.java | 46 ++++++++++++------
.../dubbo/rpc/protocol/tri/TransportObserver.java | 20 ++------
.../protocol/tri/TripleClientInboundHandler.java | 2 +-
.../tri/TripleHttp2ClientResponseHandler.java | 6 +--
.../tri/TripleHttp2FrameServerHandler.java | 6 +--
.../protocol/tri/TripleServerInboundHandler.java | 2 +-
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 2 +-
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 14 +++---
.../rpc/protocol/tri/UnaryClientStreamTest.java | 2 +-
15 files changed, 99 insertions(+), 97 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 9f676b9..7af76cf 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -169,9 +169,9 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
public void onNext(Object data) {
RpcInvocation invocation = (RpcInvocation) data;
final Metadata metadata = createRequestMeta(invocation);
- getTransportSubscriber().tryOnMetadata(metadata, false);
+ getTransportSubscriber().onMetadata(metadata, false);
final byte[] bytes = encodeRequest(invocation);
- getTransportSubscriber().tryOnData(bytes, false);
+ getTransportSubscriber().onData(bytes, false);
}
@Override
@@ -181,7 +181,7 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
@Override
public void onCompleted() {
- getTransportSubscriber().tryOnComplete();
+ getTransportSubscriber().onComplete();
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 5934871..9b74798 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -56,8 +56,8 @@ public abstract class AbstractStream implements Stream {
ThreadFactory tripleTF = new NamedInternalThreadFactory("tri-callback", true);
for (int i = 0; i < 4; i++) {
final ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 0, TimeUnit.DAYS,
- new LinkedBlockingQueue<>(1024),
- tripleTF, new ThreadPoolExecutor.AbortPolicy());
+ new LinkedBlockingQueue<>(1024),
+ tripleTF, new ThreadPoolExecutor.AbortPolicy());
CALLBACK_EXECUTORS.add(tp);
}
@@ -85,7 +85,7 @@ public abstract class AbstractStream implements Stream {
this.executor = executor;
final String value = url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY);
this.multipleSerialization = ExtensionLoader.getExtensionLoader(MultipleSerialization.class)
- .getExtension(value);
+ .getExtension(value);
this.streamObserver = createStreamObserver();
this.transportObserver = createTransportObserver();
}
@@ -185,34 +185,35 @@ public abstract class AbstractStream implements Stream {
public TransportObserver asTransportObserver() {
return transportObserver;
}
- protected void transportError(GrpcStatus status, Map<String,Object> attachments) {
+
+ protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
// set metadata
Metadata metadata = getMetaData(status);
- getTransportSubscriber().tryOnMetadata(metadata, false);
+ getTransportSubscriber().onMetadata(metadata, false);
// set trailers
Metadata trailers = getTrailers(status);
if (attachments != null) {
convertAttachment(trailers, attachments);
}
- getTransportSubscriber().tryOnMetadata(trailers, true);
+ getTransportSubscriber().onMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Server-Error] " + status.toMessage());
}
}
protected void transportError(GrpcStatus status) {
- transportError(status,null);
+ transportError(status, null);
}
protected void transportError(Throwable throwable) {
GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable, throwable.getMessage());
Metadata metadata = getMetaData(status);
- getTransportSubscriber().tryOnMetadata(metadata, false);
+ getTransportSubscriber().onMetadata(metadata, false);
Metadata trailers = getTrailers(status);
- getTransportSubscriber().tryOnMetadata(trailers, true);
+ getTransportSubscriber().onMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Server-Error] service=" + getServiceDescriptor().getServiceName()
- + " method=" + getMethodName(), throwable);
+ + " method=" + getMethodName(), throwable);
}
}
@@ -237,28 +238,28 @@ public abstract class AbstractStream implements Stream {
Metadata metadata = new DefaultMetadata();
Status.Builder builder = Status.newBuilder()
- .setCode(grpcStatus.code.code)
- .setMessage(getGrpcMessage(grpcStatus));
+ .setCode(grpcStatus.code.code)
+ .setMessage(getGrpcMessage(grpcStatus));
Throwable throwable = grpcStatus.cause;
if (throwable == null) {
return metadata;
}
DebugInfo debugInfo = DebugInfo.newBuilder()
- .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable))
- // can not use now
- // .setDetail(throwable.getMessage())
- .build();
+ .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable))
+ // can not use now
+ // .setDetail(throwable.getMessage())
+ .build();
builder.addDetails(Any.pack(debugInfo));
Status status = builder.build();
metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
- TripleUtil.encodeBase64ASCII(status.toByteArray()));
+ TripleUtil.encodeBase64ASCII(status.toByteArray()));
// only wrapper mode support exception serialization
if (getMethodDescriptor() != null && !getMethodDescriptor().isNeedWrap()) {
return metadata;
}
try {
TripleWrapper.TripleExceptionWrapper exceptionWrapper = TripleUtil.wrapException(getUrl(), throwable,
- getSerializeType(), getMultipleSerialization());
+ getSerializeType(), getMultipleSerialization());
String exceptionStr = TripleUtil.encodeBase64ASCII(exceptionWrapper.toByteArray());
if (!TripleUtil.overEachHeaderListSize(exceptionStr)) {
metadata.put(TripleHeaderEnum.EXCEPTION_TW_BIN.getHeader(), exceptionStr);
@@ -341,7 +342,7 @@ public abstract class AbstractStream implements Stream {
}
@Override
- public void onMetadata(Metadata metadata, boolean endStream, OperationHandler handler) {
+ public void onMetadata(Metadata metadata, boolean endStream) {
if (headers == null) {
headers = metadata;
} else {
@@ -378,24 +379,25 @@ public abstract class AbstractStream implements Stream {
protected abstract void onError(GrpcStatus status);
@Override
- public void onComplete(OperationHandler handler) {
+ public void onComplete() {
final GrpcStatus status = extractStatusFromMeta(getHeaders());
- if (GrpcStatus.Code.isOk(status.code.code)) {
- doOnComplete(handler);
+ if (Code.isOk(status.code.code)) {
+ doOnComplete();
} else {
onError(status);
}
}
- protected abstract void doOnComplete(OperationHandler handler);
+ protected abstract void doOnComplete() ;
+
@Override
- public void onData(byte[] in, boolean endStream, OperationHandler handler) {
+ public void onData(byte[] in, boolean endStream) {
if (data == null) {
this.data = in;
} else {
- handler.operationDone(OperationResult.FAILURE, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription(DUPLICATED_DATA).asException());
+ onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription(DUPLICATED_DATA));
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index a69bd6b..4e31f6e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -37,10 +37,10 @@ public class ClientStream extends AbstractClientStream implements Stream {
if (!metaSent) {
metaSent = true;
final Metadata metadata = createRequestMeta((RpcInvocation) getRequest().getData());
- getTransportSubscriber().tryOnMetadata(metadata, false);
+ getTransportSubscriber().onMetadata(metadata, false);
}
final byte[] bytes = encodeRequest(data);
- getTransportSubscriber().tryOnData(bytes, false);
+ getTransportSubscriber().onData(bytes, false);
}
@Override
@@ -55,7 +55,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
return new AbstractTransportObserver() {
@Override
- public void onData(byte[] data, boolean endStream, OperationHandler handler) {
+ public void onData(byte[] data, boolean endStream) {
execute(() -> {
final Object resp = deserializeResponse(data);
getStreamSubscriber().onNext(resp);
@@ -63,7 +63,7 @@ public class ClientStream extends AbstractClientStream implements Stream {
}
@Override
- public void onComplete(OperationHandler handler) {
+ public void onComplete() {
execute(() -> {
final GrpcStatus status = extractStatusFromMeta(getHeaders());
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index 871e874..cd3f1e6 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -59,7 +59,7 @@ public class ClientTransportObserver implements TransportObserver {
}
@Override
- public void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler) {
+ public void onMetadata(Metadata metadata, boolean endStream) {
if (!headerSent) {
final Http2Headers headers = new DefaultHttp2Headers(true)
.path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
@@ -78,7 +78,7 @@ public class ClientTransportObserver implements TransportObserver {
}
@Override
- public void onData(byte[] data, boolean endStream, Stream.OperationHandler handler) {
+ public void onData(byte[] data, boolean endStream) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeByte(0);
buf.writeInt(data.length);
@@ -92,7 +92,7 @@ public class ClientTransportObserver implements TransportObserver {
}
@Override
- public void onComplete(Stream.OperationHandler handler) {
+ public void onComplete() {
if (!endStreamSent) {
endStreamSent = true;
streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 75e01d9..27e568c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -44,11 +44,11 @@ public class ServerStream extends AbstractServerStream implements Stream {
@Override
public void onNext(Object data) {
if (!headersSent) {
- getTransportSubscriber().tryOnMetadata(new DefaultMetadata(), false);
+ getTransportSubscriber().onMetadata(new DefaultMetadata(), false);
headersSent = true;
}
final byte[] bytes = encodeResponse(data);
- getTransportSubscriber().tryOnData(bytes, false);
+ getTransportSubscriber().onData(bytes, false);
}
@Override
@@ -64,15 +64,15 @@ public class ServerStream extends AbstractServerStream implements Stream {
Metadata metadata = new DefaultMetadata();
metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), "OK");
metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
- getTransportSubscriber().tryOnMetadata(metadata, true);
+ getTransportSubscriber().onMetadata(metadata, true);
}
}
private class StreamTransportObserver extends AbstractTransportObserver implements TransportObserver {
@Override
- public void onMetadata(Metadata metadata, boolean endStream, OperationHandler handler) {
- super.onMetadata(metadata, endStream, handler);
+ public void onMetadata(Metadata metadata, boolean endStream) {
+ super.onMetadata(metadata, endStream);
if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
return;
}
@@ -88,7 +88,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
}
@Override
- public void onData(byte[] in, boolean endStream, OperationHandler handler) {
+ public void onData(byte[] in, boolean endStream) {
try {
if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
RpcInvocation inv = buildInvocation(getHeaders());
@@ -112,7 +112,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
}
@Override
- public void onComplete(OperationHandler handler) {
+ public void onComplete() {
if (getMethodDescriptor().getRpcType() == MethodDescriptor.RpcType.SERVER_STREAM) {
return;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index 0b7f9ff..11b0c2d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -34,7 +34,7 @@ public class ServerTransportObserver implements TransportObserver {
}
@Override
- public void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler) {
+ public void onMetadata(Metadata metadata, boolean endStream) {
final DefaultHttp2Headers headers = new DefaultHttp2Headers(true);
metadata.forEach(e -> {
headers.set(e.getKey(), e.getValue());
@@ -50,15 +50,11 @@ public class ServerTransportObserver implements TransportObserver {
}
@Override
- public void onData(byte[] data, boolean endStream, Stream.OperationHandler handler) {
+ public void onData(byte[] data, boolean endStream) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeByte(0);
buf.writeInt(data.length);
buf.writeBytes(data);
ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false));
}
-
- @Override
- public void onComplete(Stream.OperationHandler handler) {
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
index 4737fa6..de83eab 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Stream.java
@@ -20,32 +20,50 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
+/**
+ * Stream acts as a bi-directional intermediate layer for streaming data processing. It serializes object instance to
+ * byte[] then send to remote, and deserializes byte[] to object instance from remote. {@link #asTransportObserver()}
+ * and {@link #subscribe(TransportObserver)} provide {@link TransportObserver} to send or receive remote data.
+ * {@link #asStreamObserver()} and {@link #subscribe(StreamObserver)} provide {@link StreamObserver}
+ * as API for users fetching/emitting objects from/to remote peer.
+ */
public interface Stream {
Logger LOGGER = LoggerFactory.getLogger(Stream.class);
+ /**
+ * Register an upstream data observer to receive byte[] sent by this stream
+ *
+ * @param observer receives remote byte[] data
+ */
void subscribe(TransportObserver observer);
+ /**
+ * Get a downstream data observer for writing byte[] data to this stream
+ *
+ * @return an observer for writing byte[] to remote peer
+ */
TransportObserver asTransportObserver();
+ /**
+ * Register an upstream data observer to receive byte[] sent by this stream
+ *
+ * @param observer receives remote byte[] data
+ */
void subscribe(StreamObserver<Object> observer);
+ /**
+ * Get a downstream data observer for transmitting instances to application code
+ *
+ * @return an observer for writing byte[] to remote peer
+ */
StreamObserver<Object> asStreamObserver();
+ /**
+ * Execute a task in stream's executor
+ *
+ * @param runnable task to run
+ */
void execute(Runnable runnable);
- enum OperationResult {
- OK,
- FAILURE,
- NETWORK_FAIL
- }
-
- interface OperationHandler {
-
- /**
- * @param result operation's result
- * @param cause null if the operation succeed
- */
- void operationDone(OperationResult result, Throwable cause);
- }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 6e33c5c..64d833b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -18,25 +18,11 @@
package org.apache.dubbo.rpc.protocol.tri;
public interface TransportObserver {
- Stream.OperationHandler EMPTY_HANDLER = (result, cause) -> {
- };
- default void tryOnMetadata(Metadata metadata, boolean endStream) {
- onMetadata(metadata, endStream, EMPTY_HANDLER);
- }
+ void onMetadata(Metadata metadata, boolean endStream);
- default void tryOnData(byte[] data, boolean endStream) {
- onData(data, endStream, EMPTY_HANDLER);
- }
+ void onData(byte[] data, boolean endStream);
- default void tryOnComplete() {
- onComplete(EMPTY_HANDLER);
- }
-
- void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler);
-
- void onData(byte[] data, boolean endStream, Stream.OperationHandler handler);
-
- void onComplete(Stream.OperationHandler handler);
+ default void onComplete(){}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index 913e91c..73d25f9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -27,7 +27,7 @@ public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
final byte[] data = (byte[]) msg;
if (clientStream != null) {
clientStream.asTransportObserver()
- .tryOnData(data, false);
+ .onData(data, false);
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index 096d412..2d2a4bd 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -60,9 +60,9 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
Http2Headers headers = msg.headers();
AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
final TransportObserver observer = clientStream.asTransportObserver();
- observer.tryOnMetadata(new Http2HeaderMeta(headers), false);
+ observer.onMetadata(new Http2HeaderMeta(headers), false);
if (msg.isEndStream()) {
- observer.tryOnComplete();
+ observer.onComplete();
}
}
@@ -85,7 +85,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
// stream already closed;
if (clientStream != null) {
- clientStream.asTransportObserver().tryOnComplete();
+ clientStream.asTransportObserver().onComplete();
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index cbe37b6..0d71cb9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -86,7 +86,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
if (msg.isEndStream()) {
final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
if (serverStream != null) {
- serverStream.asTransportObserver().tryOnComplete();
+ serverStream.asTransportObserver().onComplete();
}
}
}
@@ -201,9 +201,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
stream.methods(methodDescriptors);
}
final TransportObserver observer = stream.asTransportObserver();
- observer.tryOnMetadata(new Http2HeaderMeta(headers), false);
+ observer.onMetadata(new Http2HeaderMeta(headers), false);
if (msg.isEndStream()) {
- observer.tryOnComplete();
+ observer.onComplete();
}
ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index ba66c86..8c37699 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -26,7 +26,7 @@ public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
final byte[] data = (byte[]) msg;
if (serverStream != null) {
serverStream.asTransportObserver()
- .tryOnData(data, false);
+ .onData(data, false);
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index d0a118f..d8916e1 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -51,7 +51,7 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
private class UnaryClientTransportObserver extends UnaryTransportObserver implements TransportObserver {
@Override
- public void doOnComplete(OperationHandler handler) {
+ public void doOnComplete() {
execute(() -> {
try {
AppResponse result;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index c9de0cd..ca03aa4 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -55,13 +55,13 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
}
@Override
- public void doOnComplete(OperationHandler handler) {
- if (getData() == null) {
+ public void doOnComplete() {
+ if (getData() != null) {
+ execute(this::invoke);
+ } else {
onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withDescription("Missing request data"));
- return;
}
- execute(this::invoke);
}
public void invoke() {
@@ -108,7 +108,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
}
Metadata metadata = new DefaultMetadata();
metadata.put(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO);
- getTransportSubscriber().tryOnMetadata(metadata, false);
+ getTransportSubscriber().onMetadata(metadata, false);
final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
final byte[] data;
@@ -119,7 +119,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}
- getTransportSubscriber().tryOnData(data, false);
+ getTransportSubscriber().onData(data, false);
Metadata trailers = new DefaultMetadata()
.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(GrpcStatus.Code.OK.code));
@@ -127,7 +127,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
if (attachments != null) {
convertAttachment(trailers, attachments);
}
- getTransportSubscriber().tryOnMetadata(trailers, true);
+ getTransportSubscriber().onMetadata(trailers, true);
} catch (Throwable e) {
LOGGER.warn("Exception processing triple message", e);
if (e instanceof TripleRpcException) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
index 3af783f..611bb98 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStreamTest.java
@@ -55,7 +55,7 @@ class UnaryClientStreamTest {
stream.subscribe(transportObserver);
// no method descriptor
Assertions.assertThrows(NullPointerException.class, () -> observer.onNext(inv));
- Mockito.verify(transportObserver).tryOnMetadata(any(), anyBoolean());
+ Mockito.verify(transportObserver).onMetadata(any(), anyBoolean());
MethodDescriptor md = Mockito.mock(MethodDescriptor.class);
when(md.isNeedWrap()).thenReturn(true);