You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ea...@apache.org on 2022/04/02 13:20:27 UTC
[dubbo] branch 3.0 updated: Fix tls not work when use pu handler (#9882)
This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 3469d7c Fix tls not work when use pu handler (#9882)
3469d7c is described below
commit 3469d7c731632b56391ee6d4c2d1271c20b7a38e
Author: GuoHao <gu...@gmail.com>
AuthorDate: Sat Apr 2 21:19:52 2022 +0800
Fix tls not work when use pu handler (#9882)
---
.../dubbo/remoting/api/PortUnificationServer.java | 22 ++++--
.../remoting/api/PortUnificationServerHandler.java | 81 ++++++++++++++--------
.../protocol/tri/stream/TripleServerStream.java | 6 +-
3 files changed, 73 insertions(+), 36 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
index 7ce4932..1ddf324 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
@@ -76,9 +76,11 @@ public class PortUnificationServer {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
this.url = ExecutorUtil.setThreadName(url, "DubboPUServerHandler");
- this.protocols = ExtensionLoader.getExtensionLoader(WireProtocol.class).getActivateExtension(url, new String[0]);
+ this.protocols = ExtensionLoader.getExtensionLoader(WireProtocol.class)
+ .getActivateExtension(url, new String[0]);
// read config before destroy
- serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
+ serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(
+ getUrl().getOrDefaultModuleModel());
}
public URL getUrl() {
@@ -122,12 +124,16 @@ public class PortUnificationServer {
// p.addLast(new LoggingHandler(LogLevel.DEBUG));
final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
+ final PortUnificationServerHandler puHandler;
if (enableSsl) {
- p.addLast("negotiation-ssl", new SslServerTlsHandler(getUrl()));
+ puHandler = new PortUnificationServerHandler(url,
+ SslContexts.buildServerSslContext(url), true, protocols);
+ } else {
+ puHandler = new PortUnificationServerHandler(url, null, false, protocols);
}
- final PortUnificationServerHandler puHandler = new PortUnificationServerHandler(url, protocols);
- p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
+ p.addLast("server-idle-handler",
+ new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
p.addLast("negotiation-protocol", puHandler);
channelGroup = puHandler.getChannels();
}
@@ -173,8 +179,10 @@ public class PortUnificationServer {
if (bootstrap != null) {
long timeout = serverShutdownTimeoutMills;
long quietPeriod = Math.min(2000L, timeout);
- Future<?> bossGroupShutdownFuture = bossGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS);
- Future<?> workerGroupShutdownFuture = workerGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS);
+ Future<?> bossGroupShutdownFuture = bossGroup.shutdownGracefully(quietPeriod,
+ timeout, MILLISECONDS);
+ Future<?> workerGroupShutdownFuture = workerGroup.shutdownGracefully(quietPeriod,
+ timeout, MILLISECONDS);
bossGroupShutdownFuture.awaitUninterruptibly(timeout, MILLISECONDS);
workerGroupShutdownFuture.awaitUninterruptibly(timeout, MILLISECONDS);
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
index 4e15beb..1a28cb4 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
@@ -23,9 +23,11 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.List;
@@ -38,18 +40,21 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
private final SslContext sslCtx;
private final URL url;
+ private final boolean detectSsl;
private final List<WireProtocol> protocols;
private final DefaultChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
public PortUnificationServerHandler(URL url, List<WireProtocol> protocols) {
- this(url, null, protocols);
+ this(url, null, false, protocols);
}
- public PortUnificationServerHandler(URL url, SslContext sslCtx, List<WireProtocol> protocols) {
+ public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl,
+ List<WireProtocol> protocols) {
this.url = url;
this.sslCtx = sslCtx;
this.protocols = protocols;
+ this.detectSsl = detectSsl;
}
@Override
@@ -81,34 +86,54 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
return;
}
- for (final WireProtocol protocol : protocols) {
- in.markReaderIndex();
- final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
- in.resetReaderIndex();
- switch (result) {
- case UNRECOGNIZED:
- continue;
- case RECOGNIZED:
- protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
- ctx.pipeline().remove(this);
- case NEED_MORE_DATA:
- return;
- default:
- return;
+ if (isSsl(in)) {
+ enableSsl(ctx);
+ } else {
+ for (final WireProtocol protocol : protocols) {
+ in.markReaderIndex();
+ final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
+ in.resetReaderIndex();
+ switch (result) {
+ case UNRECOGNIZED:
+ continue;
+ case RECOGNIZED:
+ protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
+ ctx.pipeline().remove(this);
+ case NEED_MORE_DATA:
+ return;
+ default:
+ return;
+ }
}
+ byte[] preface = new byte[in.readableBytes()];
+ in.readBytes(preface);
+ Set<String> supported = url.getApplicationModel()
+ .getExtensionLoader(WireProtocol.class)
+ .getSupportedExtensions();
+ LOGGER.error(String.format("Can not recognize protocol from downstream=%s . "
+ + "preface=%s protocols=%s", ctx.channel().remoteAddress(),
+ Bytes.bytes2hex(preface),
+ supported));
+
+ // Unknown protocol; discard everything and close the connection.
+ in.clear();
+ ctx.close();
}
- byte[] preface = new byte[in.readableBytes()];
- in.readBytes(preface);
- Set<String> supported = url.getApplicationModel()
- .getExtensionLoader(WireProtocol.class)
- .getSupportedExtensions();
- LOGGER.error(String.format("Can not recognize protocol from downstream=%s . "
- + "preface=%s protocols=%s", ctx.channel().remoteAddress(), Bytes.bytes2hex(preface),
- supported));
-
- // Unknown protocol; discard everything and close the connection.
- in.clear();
- ctx.close();
}
+ private void enableSsl(ChannelHandlerContext ctx) {
+ ChannelPipeline p = ctx.pipeline();
+ p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
+ p.addLast("unificationA", new PortUnificationServerHandler(url, sslCtx, false, protocols));
+ p.remove(this);
+ }
+
+ private boolean isSsl(ByteBuf buf) {
+ if (detectSsl) {
+ return SslHandler.isEncrypted(buf);
+ }
+ return false;
+ }
+
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index 391eabb..3eab0c8 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -384,7 +384,8 @@ public class TripleServerStream extends AbstractStream implements ServerStream {
Map<String, Object> requestMetadata = headersToMap(headers);
boolean hasStub = pathResolver.hasNativeStub(path);
if (hasStub) {
- listener = new StubAbstractServerCall(invoker, TripleServerStream.this, frameworkModel,
+ listener = new StubAbstractServerCall(invoker, TripleServerStream.this,
+ frameworkModel,
acceptEncoding, serviceName, originalMethodName, executor);
} else {
listener = new ReflectionAbstractServerCall(invoker, TripleServerStream.this,
@@ -407,6 +408,9 @@ public class TripleServerStream extends AbstractStream implements ServerStream {
}
private void doOnData(ByteBuf data, boolean endStream) {
+ if (deframer == null) {
+ return;
+ }
deframer.deframe(data);
if (endStream) {
deframer.close();