You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/27 06:21:35 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] Fix missing response and
duplicate trailers (#9140)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 448da6e [3.0-Triple] Fix missing response and duplicate trailers (#9140)
448da6e is described below
commit 448da6ea8696d8c73cc2b83dfd5401bd83cc3b06
Author: GuoHao <gu...@gmail.com>
AuthorDate: Wed Oct 27 14:21:12 2021 +0800
[3.0-Triple] Fix missing response and duplicate trailers (#9140)
* Fix missing response and duplicate trailers
* Some minor fix
---
.../remoting/exchange/support/DefaultFuture2.java | 61 ++++++++++------------
.../rpc/protocol/tri/AbstractClientStream.java | 5 --
.../dubbo/rpc/protocol/tri/PathResolver.java | 3 ++
...va => ServerUnaryInboundTransportObserver.java} | 2 +-
.../protocol/tri/TripleClientRequestHandler.java | 18 ++++---
.../dubbo/rpc/protocol/tri/TripleConstant.java | 5 +-
.../tri/TripleHttp2FrameServerHandler.java | 8 +--
.../rpc/protocol/tri/TripleHttp2Protocol.java | 14 ++++-
.../protocol/tri/TripleServerInboundHandler.java | 2 +-
.../rpc/protocol/tri/TripleServerInitializer.java | 41 ---------------
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 2 +-
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 5 +-
12 files changed, 66 insertions(+), 100 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
index de8baf1..e9eb4f3 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
@@ -43,28 +43,26 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * DefaultFuture.
+ * DefaultFuture2.
+ * This class is duplicated with {@link DefaultFuture} because the underlying connection abstraction was not designed for
+ * multiple protocol.
+ * TODO Remove this class and abstract common logic for waiting async result.
*/
public class DefaultFuture2 extends CompletableFuture<Object> {
- private static GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new GlobalResourceInitializer<>(
- () -> new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true),
- 30, TimeUnit.MILLISECONDS),
- () -> destroy());
-
+ private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER =
+ new GlobalResourceInitializer<>(() -> new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true),
+ 30, TimeUnit.MILLISECONDS), DefaultFuture2::destroy);
private static final Logger logger = LoggerFactory.getLogger(DefaultFuture2.class);
private static final Map<Long, DefaultFuture2> FUTURES = new ConcurrentHashMap<>();
-
// invoke id.
private final Request request;
private final Connection connection;
private final int timeout;
private final long start = System.currentTimeMillis();
+ private final List<Runnable> timeoutListeners = new ArrayList<>();
private volatile long sent;
private Timeout timeoutCheckTask;
-
- private List<Runnable> timeoutListeners = new ArrayList<>();
-
private ExecutorService executor;
private DefaultFuture2(Connection client2, Request request, int timeout) {
@@ -75,19 +73,11 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
FUTURES.put(request.getId(), this);
}
- public void addTimeoutListener(Runnable runnable) {
- timeoutListeners.add(runnable);
- }
-
public static void addTimeoutListener(long id, Runnable runnable) {
DefaultFuture2 defaultFuture2 = FUTURES.get(id);
defaultFuture2.addTimeoutListener(runnable);
}
- public List<Runnable> getTimeoutListeners() {
- return timeoutListeners;
- }
-
/**
* check time out of the future
*/
@@ -97,7 +87,7 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
}
public static void destroy() {
- TIME_OUT_TIMER.remove(timer-> timer.stop());
+ TIME_OUT_TIMER.remove(Timer::stop);
FUTURES.clear();
}
@@ -149,13 +139,21 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
- + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
- + ", response status is " + response.getStatus()
- + (connection == null ? "" : ", channel: " + connection.getChannel().localAddress()
- + " -> " + connection.getRemote()) + ", please check provider side for detailed result.");
+ + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ + ", response status is " + response.getStatus()
+ + (connection == null ? "" : ", channel: " + connection.getChannel().localAddress()
+ + " -> " + connection.getRemote()) + ", please check provider side for detailed result.");
}
}
+ public void addTimeoutListener(Runnable runnable) {
+ timeoutListeners.add(runnable);
+ }
+
+ public List<Runnable> getTimeoutListeners() {
+ return timeoutListeners;
+ }
+
public ExecutorService getExecutor() {
return executor;
}
@@ -202,7 +200,7 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
- " which is not an expected state, interrupt the thread manually by returning an exception."));
+ " which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
@@ -219,7 +217,6 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
return sent > 0;
}
-
private int getTimeout() {
return timeout;
}
@@ -231,13 +228,13 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
private String getTimeoutMessage(boolean scan) {
long nowTimestamp = System.currentTimeMillis();
return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
- + (scan ? " by scan timer" : "") + ". start time: "
- + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
- + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(nowTimestamp))) + ","
- + (sent > 0 ? " client elapsed: " + (sent - start)
- + " ms, server elapsed: " + (nowTimestamp - sent)
- : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
- + timeout + " ms, request: " + (logger.isDebugEnabled() ? request : getRequestWithoutData()) + ", channel: " + connection.getChannel();
+ + (scan ? " by scan timer" : "") + ". start time: "
+ + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
+ + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(nowTimestamp))) + ","
+ + (sent > 0 ? " client elapsed: " + (sent - start)
+ + " ms, server elapsed: " + (nowTimestamp - sent)
+ : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
+ + timeout + " ms, request: " + (logger.isDebugEnabled() ? request : getRequestWithoutData()) + ", channel: " + connection.getChannel();
}
private Request getRequestWithoutData() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 97dbe04..2d75ca0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -143,11 +143,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
execute(() -> {
- channel.pipeline()
- .addLast(new TripleHttp2ClientResponseHandler())
- .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
- .addLast(new TripleClientInboundHandler());
- channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(this);
final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(channel, promise);
subscribe(clientTransportObserver);
try {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java
index 3490cd2..54f8415 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java
@@ -21,6 +21,9 @@ import org.apache.dubbo.common.extension.ExtensionScope;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Invoker;
+/**
+ * PathResolver maintains a mapping between request path and Invoker for multiple protocols.
+ */
@SPI(value = CommonConstants.TRIPLE, scope = ExtensionScope.FRAMEWORK)
public interface PathResolver {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerUnaryInboundTransportObserver.java
similarity index 91%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerUnaryInboundTransportObserver.java
index 8e9ed66..268a725 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerUnaryInboundTransportObserver.java
@@ -17,7 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri;
-abstract class UnaryInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
+abstract class ServerUnaryInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
protected static final String DUPLICATED_DATA = "Duplicated data";
private byte[] data;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index ba2da9a..7d11b30 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -37,23 +37,25 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (msg instanceof Request) {
- writeRequest(ctx, (Request) msg, promise);
- } else {
+ if (!(msg instanceof Request)) {
super.write(ctx, msg, promise);
+ return;
}
- }
-
- private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
+ final Request req = (Request) msg;
Connection connection = Connection.getConnectionFromChannel(ctx.channel());
final AbstractClientStream stream = AbstractClientStream.newClientStream(req, connection);
final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
streamChannelBootstrap.open()
.addListener(future -> {
if (future.isSuccess()) {
- final Http2StreamChannel curChannel = (Http2StreamChannel) future.get();
+ final Http2StreamChannel channel = (Http2StreamChannel) future.get();
+ channel.pipeline()
+ .addLast(new TripleHttp2ClientResponseHandler())
+ .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
+ .addLast(new TripleClientInboundHandler());
+ channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
// Start call only when the channel creation is successful
- stream.startCall(curChannel, promise);
+ stream.startCall(channel, promise);
} else {
promise.tryFailure(future.cause());
DefaultFuture2.getFuture(req.getId()).cancel();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 91ace1c..8d7ec48 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -16,8 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import org.apache.dubbo.common.constants.CommonConstants;
-
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
@@ -41,7 +39,6 @@ public class TripleConstant {
public static final AsciiString HTTPS_SCHEME = AsciiString.of("https");
public static final AsciiString HTTP_SCHEME = AsciiString.of("http");
- public static final AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
public static final AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.valueOf("tri_server_stream");
public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.valueOf("tri_client_stream");
@@ -50,7 +47,7 @@ public class TripleConstant {
public static final Metadata SUCCESS_RESPONSE_META = getSuccessResponseMeta();
- private static Metadata getSuccessResponseMeta() {
+ static Metadata getSuccessResponseMeta() {
Metadata metadata = new DefaultMetadata();
metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_MESSAGE);
metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_STATUS);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 7c9ff0b..f450e47 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -51,12 +51,12 @@ import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TripleHttp2FrameServerHandler.class);
- private final PathResolver PATH_RESOLVER;
+ private final PathResolver pathResolver;
private final FrameworkModel frameworkModel;
public TripleHttp2FrameServerHandler(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
- this.PATH_RESOLVER = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
+ this.pathResolver = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
}
@Override
@@ -116,9 +116,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final String group = headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader()) ? headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader())
.toString() : null;
final String key = URL.buildKey(serviceName, group, version);
- Invoker<?> invoker = PATH_RESOLVER.resolve(key);
+ Invoker<?> invoker = pathResolver.resolve(key);
if (invoker == null) {
- invoker = PATH_RESOLVER.resolve(serviceName);
+ invoker = pathResolver.resolve(serviceName);
}
return invoker;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 8c301dc..bb1567a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -25,6 +25,8 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
@@ -72,7 +74,17 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
.frameLogger(SERVER_LOGGER)
.build();
- final Http2MultiplexHandler handler = new Http2MultiplexHandler(new TripleServerInitializer(frameworkModel));
+ final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) {
+ final ChannelPipeline p = ch.pipeline();
+ p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
+ // TODO constraint MAX DATA_SIZE
+ p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
+ p.addLast(new TripleServerInboundHandler());
+ }
+ });
pipeline.addLast(codec, new TripleServerConnectionHandler(), handler);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 7046bf2..115970e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -21,7 +21,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
final byte[] data = (byte[]) msg;
if (serverStream != null) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
deleted file mode 100644
index 7120aef..0000000
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri;
-
-import org.apache.dubbo.rpc.model.FrameworkModel;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-
-public class TripleServerInitializer extends ChannelInitializer<Channel> {
-
- private final FrameworkModel frameworkModel;
-
- public TripleServerInitializer(FrameworkModel frameworkModel) {
- this.frameworkModel = frameworkModel;
- }
-
- @Override
- protected void initChannel(Channel ch) throws Exception {
- final ChannelPipeline p = ch.pipeline();
- p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
- // TODO constraint MAX DATA_SIZE
- p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
- p.addLast(new TripleServerInboundHandler());
- }
-}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 774b98a..30e8b94 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -69,7 +69,7 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
return map;
}
- private class ClientUnaryInboundTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
+ private class ClientUnaryInboundTransportObserver extends ServerUnaryInboundTransportObserver implements TransportObserver {
@Override
public void onComplete() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 663ef48..0cc5efa 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -45,7 +45,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
return new UnaryServerTransportObserver();
}
- private class UnaryServerTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
+ private class UnaryServerTransportObserver extends ServerUnaryInboundTransportObserver implements TransportObserver {
@Override
public void onError(GrpcStatus status) {
transportError(status);
@@ -85,10 +85,11 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
outboundTransportObserver().onMetadata(metadata, false);
final byte[] data = encodeResponse(response.getValue());
if (data == null) {
+ // already handled in encodeResponse()
return;
}
outboundTransportObserver().onData(data, false);
- Metadata trailers = TripleConstant.SUCCESS_RESPONSE_META;
+ Metadata trailers = TripleConstant.getSuccessResponseMeta();
convertAttachment(trailers, response.getObjectAttachments());
outboundTransportObserver().onMetadata(trailers, true);
});