You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/30 06:30:16 UTC
[dubbo] branch 3.0 updated: Remove unused client handler (#8955)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 8c60208 Remove unused client handler (#8955)
8c60208 is described below
commit 8c602088dc0aef307ac1599dc19d54b407f2171c
Author: GuoHao <gu...@gmail.com>
AuthorDate: Thu Sep 30 14:30:02 2021 +0800
Remove unused client handler (#8955)
* Remove unused client handler
* Seperate multiple http2 handler and request handler
* Fix codestyle
---
.../rpc/protocol/tri/TripleClientHandler.java | 112 +--------------------
...andler.java => TripleClientRequestHandler.java} | 21 +---
.../rpc/protocol/tri/TripleHttp2Protocol.java | 11 +-
3 files changed, 6 insertions(+), 138 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 9d1795a..96c8097 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,35 +16,14 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.remoting.api.ConnectionHandler;
-import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
-import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
-import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
import io.netty.util.ReferenceCountUtil;
-import java.util.Arrays;
-import java.util.List;
-
public class TripleClientHandler extends ChannelDuplexHandler {
private final FrameworkModel frameworkModel;
@@ -54,98 +33,11 @@ public class TripleClientHandler extends ChannelDuplexHandler {
}
@Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (msg instanceof Request) {
- writeRequest(ctx, (Request) msg, promise);
- } else {
- super.write(ctx, msg, promise);
- }
- }
-
- @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof Http2SettingsFrame) {
- // already handled
- } else if (msg instanceof Http2GoAwayFrame) {
+ if (msg instanceof Http2GoAwayFrame) {
final ConnectionHandler connectionHandler = ctx.pipeline().get(ConnectionHandler.class);
connectionHandler.onGoAway(ctx.channel());
- ReferenceCountUtil.release(msg);
- } else {
- ReferenceCountUtil.release(msg);
- }
- }
-
- private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
- DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
- final RpcInvocation inv = (RpcInvocation) req.getData();
- final URL url = inv.getInvoker().getUrl();
- ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
-
- MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel,inv);
-
- ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
- AbstractClientStream stream;
- if (methodDescriptor.isUnary()) {
- stream = AbstractClientStream.unary(url);
- } else {
- stream = AbstractClientStream.stream(url);
- }
- final CancellationContext cancellationContext = inv.getCancellationContext();
- // for client cancel,send rst frame to server
- cancellationContext.addListener(context -> {
- stream.asTransportObserver().onReset(Http2Error.CANCEL);;
- });
- stream.setCancellationContext(cancellationContext);
-
- String ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY);
- if (StringUtils.isNotEmpty(ssl)) {
- ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
- }
- stream.service(consumerModel)
- .connection(Connection.getConnectionFromChannel(ctx.channel()))
- .method(methodDescriptor)
- .methodName(methodDescriptor.getMethodName())
- .request(req)
- .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
- .subscribe(new ClientTransportObserver(ctx, stream, promise));
-
- if (methodDescriptor.isUnary()) {
- stream.asStreamObserver().onNext(inv);
- stream.asStreamObserver().onCompleted();
- } else {
- Response response = new Response(req.getId(), req.getVersion());
- AppResponse result;
- // the stream method params is fixed
- if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
- || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
- final StreamObserver<Object> streamObserver = (StreamObserver<Object>) inv.getArguments()[0];
- stream.subscribe(streamObserver);
- result = new AppResponse(stream.asStreamObserver());
- } else {
- final StreamObserver<Object> streamObserver = (StreamObserver<Object>) inv.getArguments()[1];
- stream.subscribe(streamObserver);
- result = new AppResponse();
- stream.asStreamObserver().onNext(inv.getArguments()[0]);
- stream.asStreamObserver().onCompleted();
- }
- response.setResult(result);
- DefaultFuture2.received(stream.getConnection(), response);
- }
- }
-
- /**
- * Get the trI protocol special MethodDescriptor
- */
- private MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
- List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
- if (CollectionUtils.isEmpty(methodDescriptors)) {
- throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
- }
- for (MethodDescriptor methodDescriptor : methodDescriptors) {
- if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
- return methodDescriptor;
- }
}
- throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ ReferenceCountUtil.release(msg);
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
similarity index 87%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 9d1795a..2254343 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -23,7 +23,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.Connection;
-import org.apache.dubbo.remoting.api.ConnectionHandler;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
@@ -38,18 +37,15 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
import java.util.Arrays;
import java.util.List;
-public class TripleClientHandler extends ChannelDuplexHandler {
+public class TripleClientRequestHandler extends ChannelDuplexHandler {
private final FrameworkModel frameworkModel;
- public TripleClientHandler(FrameworkModel frameworkModel) {
+ public TripleClientRequestHandler(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
}
@@ -62,19 +58,6 @@ public class TripleClientHandler extends ChannelDuplexHandler {
}
}
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof Http2SettingsFrame) {
- // already handled
- } else if (msg instanceof Http2GoAwayFrame) {
- final ConnectionHandler connectionHandler = ctx.pipeline().get(ConnectionHandler.class);
- connectionHandler.onGoAway(ctx.channel());
- ReferenceCountUtil.release(msg);
- } else {
- ReferenceCountUtil.release(msg);
- }
- }
-
private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
final RpcInvocation inv = (RpcInvocation) req.getData();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index b62b27f..455175d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -24,9 +24,7 @@ import org.apache.dubbo.remoting.api.Http2WireProtocol;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
@@ -85,12 +83,7 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
.frameLogger(CLIENT_LOGGER)
.build();
- final Http2MultiplexHandler handler = new Http2MultiplexHandler(new SimpleChannelInboundHandler<Object>() {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
- // empty
- }
- });
- pipeline.addLast(codec, handler, new TripleClientHandler(frameworkModel));
+ final Http2MultiplexHandler handler = new Http2MultiplexHandler(new TripleClientHandler(frameworkModel));
+ pipeline.addLast(codec, handler, new TripleClientRequestHandler(frameworkModel));
}
}