You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/30 06:30:16 UTC

[dubbo] branch 3.0 updated: Remove unused client handler (#8955)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 8c60208  Remove unused client handler (#8955)
8c60208 is described below

commit 8c602088dc0aef307ac1599dc19d54b407f2171c
Author: GuoHao <gu...@gmail.com>
AuthorDate: Thu Sep 30 14:30:02 2021 +0800

    Remove unused client handler (#8955)
    
    * Remove unused client handler
    
    * Seperate multiple http2 handler and request handler
    
    * Fix codestyle
---
 .../rpc/protocol/tri/TripleClientHandler.java      | 112 +--------------------
 ...andler.java => TripleClientRequestHandler.java} |  21 +---
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  11 +-
 3 files changed, 6 insertions(+), 138 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 9d1795a..96c8097 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,35 +16,14 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.remoting.api.Connection;
 import org.apache.dubbo.remoting.api.ConnectionHandler;
-import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
-import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
-import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
 
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
 import io.netty.util.ReferenceCountUtil;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class TripleClientHandler extends ChannelDuplexHandler {
 
     private final FrameworkModel frameworkModel;
@@ -54,98 +33,11 @@ public class TripleClientHandler extends ChannelDuplexHandler {
     }
 
     @Override
-    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
-        if (msg instanceof Request) {
-            writeRequest(ctx, (Request) msg, promise);
-        } else {
-            super.write(ctx, msg, promise);
-        }
-    }
-
-    @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        if (msg instanceof Http2SettingsFrame) {
-            // already handled
-        } else if (msg instanceof Http2GoAwayFrame) {
+        if (msg instanceof Http2GoAwayFrame) {
             final ConnectionHandler connectionHandler = ctx.pipeline().get(ConnectionHandler.class);
             connectionHandler.onGoAway(ctx.channel());
-            ReferenceCountUtil.release(msg);
-        } else {
-            ReferenceCountUtil.release(msg);
-        }
-    }
-
-    private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
-        DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
-        final RpcInvocation inv = (RpcInvocation) req.getData();
-        final URL url = inv.getInvoker().getUrl();
-        ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
-
-        MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel,inv);
-
-        ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
-        AbstractClientStream stream;
-        if (methodDescriptor.isUnary()) {
-            stream = AbstractClientStream.unary(url);
-        } else {
-            stream = AbstractClientStream.stream(url);
-        }
-        final CancellationContext cancellationContext = inv.getCancellationContext();
-        // for client cancel,send rst frame to server
-        cancellationContext.addListener(context -> {
-            stream.asTransportObserver().onReset(Http2Error.CANCEL);;
-        });
-        stream.setCancellationContext(cancellationContext);
-
-        String ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY);
-        if (StringUtils.isNotEmpty(ssl)) {
-            ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
-        }
-        stream.service(consumerModel)
-            .connection(Connection.getConnectionFromChannel(ctx.channel()))
-            .method(methodDescriptor)
-            .methodName(methodDescriptor.getMethodName())
-            .request(req)
-            .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
-            .subscribe(new ClientTransportObserver(ctx, stream, promise));
-
-        if (methodDescriptor.isUnary()) {
-            stream.asStreamObserver().onNext(inv);
-            stream.asStreamObserver().onCompleted();
-        } else {
-            Response response = new Response(req.getId(), req.getVersion());
-            AppResponse result;
-            // the stream method params is fixed
-            if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
-                || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
-                final StreamObserver<Object> streamObserver = (StreamObserver<Object>) inv.getArguments()[0];
-                stream.subscribe(streamObserver);
-                result = new AppResponse(stream.asStreamObserver());
-            } else {
-                final StreamObserver<Object> streamObserver = (StreamObserver<Object>) inv.getArguments()[1];
-                stream.subscribe(streamObserver);
-                result = new AppResponse();
-                stream.asStreamObserver().onNext(inv.getArguments()[0]);
-                stream.asStreamObserver().onCompleted();
-            }
-            response.setResult(result);
-            DefaultFuture2.received(stream.getConnection(), response);
-        }
-    }
-
-    /**
-     * Get the trI protocol special MethodDescriptor
-     */
-    private MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
-        List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
-        if (CollectionUtils.isEmpty(methodDescriptors)) {
-            throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
-        }
-        for (MethodDescriptor methodDescriptor : methodDescriptors) {
-            if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
-                return methodDescriptor;
-            }
         }
-        throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+        ReferenceCountUtil.release(msg);
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
similarity index 87%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 9d1795a..2254343 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -23,7 +23,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.api.Connection;
-import org.apache.dubbo.remoting.api.ConnectionHandler;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
@@ -38,18 +37,15 @@ import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
 
 import java.util.Arrays;
 import java.util.List;
 
-public class TripleClientHandler extends ChannelDuplexHandler {
+public class TripleClientRequestHandler extends ChannelDuplexHandler {
 
     private final FrameworkModel frameworkModel;
 
-    public TripleClientHandler(FrameworkModel frameworkModel) {
+    public TripleClientRequestHandler(FrameworkModel frameworkModel) {
         this.frameworkModel = frameworkModel;
     }
 
@@ -62,19 +58,6 @@ public class TripleClientHandler extends ChannelDuplexHandler {
         }
     }
 
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        if (msg instanceof Http2SettingsFrame) {
-            // already handled
-        } else if (msg instanceof Http2GoAwayFrame) {
-            final ConnectionHandler connectionHandler = ctx.pipeline().get(ConnectionHandler.class);
-            connectionHandler.onGoAway(ctx.channel());
-            ReferenceCountUtil.release(msg);
-        } else {
-            ReferenceCountUtil.release(msg);
-        }
-    }
-
     private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
         DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
         final RpcInvocation inv = (RpcInvocation) req.getData();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index b62b27f..455175d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -24,9 +24,7 @@ import org.apache.dubbo.remoting.api.Http2WireProtocol;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.ScopeModelAware;
 
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http2.Http2FrameCodec;
 import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
 import io.netty.handler.codec.http2.Http2MultiplexHandler;
@@ -85,12 +83,7 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
                         .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
                 .frameLogger(CLIENT_LOGGER)
                 .build();
-        final Http2MultiplexHandler handler = new Http2MultiplexHandler(new SimpleChannelInboundHandler<Object>() {
-            @Override
-            protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
-                // empty
-            }
-        });
-        pipeline.addLast(codec, handler, new TripleClientHandler(frameworkModel));
+        final Http2MultiplexHandler handler = new Http2MultiplexHandler(new TripleClientHandler(frameworkModel));
+        pipeline.addLast(codec, handler, new TripleClientRequestHandler(frameworkModel));
     }
 }