You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/27 06:21:35 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] Fix missing response and duplicate trailers (#9140)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 448da6e  [3.0-Triple] Fix missing response and duplicate trailers (#9140)
448da6e is described below

commit 448da6ea8696d8c73cc2b83dfd5401bd83cc3b06
Author: GuoHao <gu...@gmail.com>
AuthorDate: Wed Oct 27 14:21:12 2021 +0800

    [3.0-Triple] Fix missing response and duplicate trailers (#9140)
    
    * Fix missing response and duplicate trailers
    
    * Some minor fix
---
 .../remoting/exchange/support/DefaultFuture2.java  | 61 ++++++++++------------
 .../rpc/protocol/tri/AbstractClientStream.java     |  5 --
 .../dubbo/rpc/protocol/tri/PathResolver.java       |  3 ++
 ...va => ServerUnaryInboundTransportObserver.java} |  2 +-
 .../protocol/tri/TripleClientRequestHandler.java   | 18 ++++---
 .../dubbo/rpc/protocol/tri/TripleConstant.java     |  5 +-
 .../tri/TripleHttp2FrameServerHandler.java         |  8 +--
 .../rpc/protocol/tri/TripleHttp2Protocol.java      | 14 ++++-
 .../protocol/tri/TripleServerInboundHandler.java   |  2 +-
 .../rpc/protocol/tri/TripleServerInitializer.java  | 41 ---------------
 .../dubbo/rpc/protocol/tri/UnaryClientStream.java  |  2 +-
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  |  5 +-
 12 files changed, 66 insertions(+), 100 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
index de8baf1..e9eb4f3 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
@@ -43,28 +43,26 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
- * DefaultFuture.
+ * DefaultFuture2.
+ * This class is duplicated with {@link DefaultFuture} because the underlying connection abstraction was not designed for
+ * multiple protocol.
+ * TODO Remove this class and abstract common logic for waiting async result.
  */
 public class DefaultFuture2 extends CompletableFuture<Object> {
 
-    private static GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new GlobalResourceInitializer<>(
-        () -> new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true),
-            30, TimeUnit.MILLISECONDS),
-        () -> destroy());
-
+    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER =
+        new GlobalResourceInitializer<>(() -> new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true),
+            30, TimeUnit.MILLISECONDS), DefaultFuture2::destroy);
     private static final Logger logger = LoggerFactory.getLogger(DefaultFuture2.class);
     private static final Map<Long, DefaultFuture2> FUTURES = new ConcurrentHashMap<>();
-
     // invoke id.
     private final Request request;
     private final Connection connection;
     private final int timeout;
     private final long start = System.currentTimeMillis();
+    private final List<Runnable> timeoutListeners = new ArrayList<>();
     private volatile long sent;
     private Timeout timeoutCheckTask;
-
-    private List<Runnable> timeoutListeners = new ArrayList<>();
-
     private ExecutorService executor;
 
     private DefaultFuture2(Connection client2, Request request, int timeout) {
@@ -75,19 +73,11 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
         FUTURES.put(request.getId(), this);
     }
 
-    public void addTimeoutListener(Runnable runnable) {
-        timeoutListeners.add(runnable);
-    }
-
     public static void addTimeoutListener(long id, Runnable runnable) {
         DefaultFuture2 defaultFuture2 = FUTURES.get(id);
         defaultFuture2.addTimeoutListener(runnable);
     }
 
-    public List<Runnable> getTimeoutListeners() {
-        return timeoutListeners;
-    }
-
     /**
      * check time out of the future
      */
@@ -97,7 +87,7 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
     }
 
     public static void destroy() {
-        TIME_OUT_TIMER.remove(timer-> timer.stop());
+        TIME_OUT_TIMER.remove(Timer::stop);
         FUTURES.clear();
     }
 
@@ -149,13 +139,21 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
             future.doReceived(response);
         } else {
             logger.warn("The timeout response finally returned at "
-                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
-                    + ", response status is " + response.getStatus()
-                    + (connection == null ? "" : ", channel: " + connection.getChannel().localAddress()
-                    + " -> " + connection.getRemote()) + ", please check provider side for detailed result.");
+                + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+                + ", response status is " + response.getStatus()
+                + (connection == null ? "" : ", channel: " + connection.getChannel().localAddress()
+                + " -> " + connection.getRemote()) + ", please check provider side for detailed result.");
         }
     }
 
+    public void addTimeoutListener(Runnable runnable) {
+        timeoutListeners.add(runnable);
+    }
+
+    public List<Runnable> getTimeoutListeners() {
+        return timeoutListeners;
+    }
+
     public ExecutorService getExecutor() {
         return executor;
     }
@@ -202,7 +200,7 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
             if (threadlessExecutor.isWaiting()) {
                 threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
-                        " which is not an expected state, interrupt the thread manually by returning an exception."));
+                    " which is not an expected state, interrupt the thread manually by returning an exception."));
             }
         }
     }
@@ -219,7 +217,6 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
         return sent > 0;
     }
 
-
     private int getTimeout() {
         return timeout;
     }
@@ -231,13 +228,13 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
     private String getTimeoutMessage(boolean scan) {
         long nowTimestamp = System.currentTimeMillis();
         return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
-                + (scan ? " by scan timer" : "") + ". start time: "
-                + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
-                + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(nowTimestamp))) + ","
-                + (sent > 0 ? " client elapsed: " + (sent - start)
-                + " ms, server elapsed: " + (nowTimestamp - sent)
-                : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
-                + timeout + " ms, request: " + (logger.isDebugEnabled() ? request : getRequestWithoutData()) + ", channel: " + connection.getChannel();
+            + (scan ? " by scan timer" : "") + ". start time: "
+            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
+            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(nowTimestamp))) + ","
+            + (sent > 0 ? " client elapsed: " + (sent - start)
+            + " ms, server elapsed: " + (nowTimestamp - sent)
+            : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
+            + timeout + " ms, request: " + (logger.isDebugEnabled() ? request : getRequestWithoutData()) + ", channel: " + connection.getChannel();
     }
 
     private Request getRequestWithoutData() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 97dbe04..2d75ca0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -143,11 +143,6 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
 
     protected void startCall(Http2StreamChannel channel, ChannelPromise promise) {
         execute(() -> {
-            channel.pipeline()
-                .addLast(new TripleHttp2ClientResponseHandler())
-                .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
-                .addLast(new TripleClientInboundHandler());
-            channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(this);
             final ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(channel, promise);
             subscribe(clientTransportObserver);
             try {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java
index 3490cd2..54f8415 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PathResolver.java
@@ -21,6 +21,9 @@ import org.apache.dubbo.common.extension.ExtensionScope;
 import org.apache.dubbo.common.extension.SPI;
 import org.apache.dubbo.rpc.Invoker;
 
+/**
+ * PathResolver maintains a mapping between request path and Invoker for multiple protocols.
+ */
 @SPI(value = CommonConstants.TRIPLE, scope = ExtensionScope.FRAMEWORK)
 public interface PathResolver {
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerUnaryInboundTransportObserver.java
similarity index 91%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerUnaryInboundTransportObserver.java
index 8e9ed66..268a725 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryInboundTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerUnaryInboundTransportObserver.java
@@ -17,7 +17,7 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-abstract class UnaryInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
+abstract class ServerUnaryInboundTransportObserver extends InboundTransportObserver implements TransportObserver {
     protected static final String DUPLICATED_DATA = "Duplicated data";
 
     private byte[] data;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index ba2da9a..7d11b30 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -37,23 +37,25 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
 
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
-        if (msg instanceof Request) {
-            writeRequest(ctx, (Request) msg, promise);
-        } else {
+        if (!(msg instanceof Request)) {
             super.write(ctx, msg, promise);
+            return;
         }
-    }
-
-    private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
+        final Request req = (Request) msg;
         Connection connection = Connection.getConnectionFromChannel(ctx.channel());
         final AbstractClientStream stream = AbstractClientStream.newClientStream(req, connection);
         final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(ctx.channel());
         streamChannelBootstrap.open()
             .addListener(future -> {
                 if (future.isSuccess()) {
-                    final Http2StreamChannel curChannel = (Http2StreamChannel) future.get();
+                    final Http2StreamChannel channel = (Http2StreamChannel) future.get();
+                    channel.pipeline()
+                        .addLast(new TripleHttp2ClientResponseHandler())
+                        .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
+                        .addLast(new TripleClientInboundHandler());
+                    channel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
                     // Start call only when the channel creation is successful
-                    stream.startCall(curChannel, promise);
+                    stream.startCall(channel, promise);
                 } else {
                     promise.tryFailure(future.cause());
                     DefaultFuture2.getFuture(req.getId()).cancel();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 91ace1c..8d7ec48 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -16,8 +16,6 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import org.apache.dubbo.common.constants.CommonConstants;
-
 import io.netty.util.AsciiString;
 import io.netty.util.AttributeKey;
 
@@ -41,7 +39,6 @@ public class TripleConstant {
     public static final AsciiString HTTPS_SCHEME = AsciiString.of("https");
     public static final AsciiString HTTP_SCHEME = AsciiString.of("http");
 
-    public static final AttributeKey<Boolean> SSL_ATTRIBUTE_KEY = AttributeKey.valueOf(CommonConstants.SSL_ENABLED_KEY);
     public static final AttributeKey<AbstractServerStream> SERVER_STREAM_KEY = AttributeKey.valueOf("tri_server_stream");
     public static final AttributeKey<AbstractClientStream> CLIENT_STREAM_KEY = AttributeKey.valueOf("tri_client_stream");
 
@@ -50,7 +47,7 @@ public class TripleConstant {
 
     public static final Metadata SUCCESS_RESPONSE_META = getSuccessResponseMeta();
 
-    private static Metadata getSuccessResponseMeta() {
+    static Metadata getSuccessResponseMeta() {
         Metadata metadata = new DefaultMetadata();
         metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_MESSAGE);
         metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), TripleConstant.SUCCESS_RESPONSE_STATUS);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 7c9ff0b..f450e47 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -51,12 +51,12 @@ import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 
 public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(TripleHttp2FrameServerHandler.class);
-    private final PathResolver PATH_RESOLVER;
+    private final PathResolver pathResolver;
     private final FrameworkModel frameworkModel;
 
     public TripleHttp2FrameServerHandler(FrameworkModel frameworkModel) {
         this.frameworkModel = frameworkModel;
-        this.PATH_RESOLVER = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
+        this.pathResolver = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
     }
 
     @Override
@@ -116,9 +116,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         final String group = headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader()) ? headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader())
             .toString() : null;
         final String key = URL.buildKey(serviceName, group, version);
-        Invoker<?> invoker = PATH_RESOLVER.resolve(key);
+        Invoker<?> invoker = pathResolver.resolve(key);
         if (invoker == null) {
-            invoker = PATH_RESOLVER.resolve(serviceName);
+            invoker = pathResolver.resolve(serviceName);
         }
         return invoker;
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 8c301dc..bb1567a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -25,6 +25,8 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.ScopeModelAware;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http2.Http2FrameCodec;
 import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
@@ -72,7 +74,17 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
                 .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
             .frameLogger(SERVER_LOGGER)
             .build();
-        final Http2MultiplexHandler handler = new Http2MultiplexHandler(new TripleServerInitializer(frameworkModel));
+        final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer<Channel>() {
+
+            @Override
+            protected void initChannel(Channel ch) {
+                final ChannelPipeline p = ch.pipeline();
+                p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
+                // TODO constraint MAX DATA_SIZE
+                p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
+                p.addLast(new TripleServerInboundHandler());
+            }
+        });
         pipeline.addLast(codec, new TripleServerConnectionHandler(), handler);
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 7046bf2..115970e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -21,7 +21,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 
 public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final AbstractServerStream serverStream = ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
         final byte[] data = (byte[]) msg;
         if (serverStream != null) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
deleted file mode 100644
index 7120aef..0000000
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInitializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri;
-
-import org.apache.dubbo.rpc.model.FrameworkModel;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-
-public class TripleServerInitializer extends ChannelInitializer<Channel> {
-
-    private final FrameworkModel frameworkModel;
-
-    public TripleServerInitializer(FrameworkModel frameworkModel) {
-        this.frameworkModel = frameworkModel;
-    }
-
-    @Override
-    protected void initChannel(Channel ch) throws Exception {
-        final ChannelPipeline p = ch.pipeline();
-        p.addLast(new TripleHttp2FrameServerHandler(frameworkModel));
-        // TODO constraint MAX DATA_SIZE
-        p.addLast(new GrpcDataDecoder(Integer.MAX_VALUE, false));
-        p.addLast(new TripleServerInboundHandler());
-    }
-}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 774b98a..30e8b94 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -69,7 +69,7 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
         return map;
     }
 
-    private class ClientUnaryInboundTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
+    private class ClientUnaryInboundTransportObserver extends ServerUnaryInboundTransportObserver implements TransportObserver {
 
         @Override
         public void onComplete() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 663ef48..0cc5efa 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -45,7 +45,7 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
         return new UnaryServerTransportObserver();
     }
 
-    private class UnaryServerTransportObserver extends UnaryInboundTransportObserver implements TransportObserver {
+    private class UnaryServerTransportObserver extends ServerUnaryInboundTransportObserver implements TransportObserver {
         @Override
         public void onError(GrpcStatus status) {
             transportError(status);
@@ -85,10 +85,11 @@ public class UnaryServerStream extends AbstractServerStream implements Stream {
                 outboundTransportObserver().onMetadata(metadata, false);
                 final byte[] data = encodeResponse(response.getValue());
                 if (data == null) {
+                    // already handled in encodeResponse()
                     return;
                 }
                 outboundTransportObserver().onData(data, false);
-                Metadata trailers = TripleConstant.SUCCESS_RESPONSE_META;
+                Metadata trailers = TripleConstant.getSuccessResponseMeta();
                 convertAttachment(trailers, response.getObjectAttachments());
                 outboundTransportObserver().onMetadata(trailers, true);
             });