You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ea...@apache.org on 2022/04/02 13:20:27 UTC

[dubbo] branch 3.0 updated: Fix tls not work when use pu handler (#9882)

This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3469d7c  Fix tls not work when use pu handler (#9882)
3469d7c is described below

commit 3469d7c731632b56391ee6d4c2d1271c20b7a38e
Author: GuoHao <gu...@gmail.com>
AuthorDate: Sat Apr 2 21:19:52 2022 +0800

    Fix tls not work when use pu handler (#9882)
---
 .../dubbo/remoting/api/PortUnificationServer.java  | 22 ++++--
 .../remoting/api/PortUnificationServerHandler.java | 81 ++++++++++++++--------
 .../protocol/tri/stream/TripleServerStream.java    |  6 +-
 3 files changed, 73 insertions(+), 36 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
index 7ce4932..1ddf324 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
@@ -76,9 +76,11 @@ public class PortUnificationServer {
         // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
         // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
         this.url = ExecutorUtil.setThreadName(url, "DubboPUServerHandler");
-        this.protocols = ExtensionLoader.getExtensionLoader(WireProtocol.class).getActivateExtension(url, new String[0]);
+        this.protocols = ExtensionLoader.getExtensionLoader(WireProtocol.class)
+            .getActivateExtension(url, new String[0]);
         // read config before destroy
-        serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
+        serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(
+            getUrl().getOrDefaultModuleModel());
     }
 
     public URL getUrl() {
@@ -122,12 +124,16 @@ public class PortUnificationServer {
 //                        p.addLast(new LoggingHandler(LogLevel.DEBUG));
 
                     final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
+                    final PortUnificationServerHandler puHandler;
                     if (enableSsl) {
-                        p.addLast("negotiation-ssl", new SslServerTlsHandler(getUrl()));
+                        puHandler = new PortUnificationServerHandler(url,
+                            SslContexts.buildServerSslContext(url), true, protocols);
+                    } else {
+                        puHandler = new PortUnificationServerHandler(url, null, false, protocols);
                     }
 
-                    final PortUnificationServerHandler puHandler = new PortUnificationServerHandler(url, protocols);
-                    p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
+                    p.addLast("server-idle-handler",
+                        new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
                     p.addLast("negotiation-protocol", puHandler);
                     channelGroup = puHandler.getChannels();
                 }
@@ -173,8 +179,10 @@ public class PortUnificationServer {
             if (bootstrap != null) {
                 long timeout = serverShutdownTimeoutMills;
                 long quietPeriod = Math.min(2000L, timeout);
-                Future<?> bossGroupShutdownFuture = bossGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS);
-                Future<?> workerGroupShutdownFuture = workerGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS);
+                Future<?> bossGroupShutdownFuture = bossGroup.shutdownGracefully(quietPeriod,
+                    timeout, MILLISECONDS);
+                Future<?> workerGroupShutdownFuture = workerGroup.shutdownGracefully(quietPeriod,
+                    timeout, MILLISECONDS);
                 bossGroupShutdownFuture.awaitUninterruptibly(timeout, MILLISECONDS);
                 workerGroupShutdownFuture.awaitUninterruptibly(timeout, MILLISECONDS);
             }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
index 4e15beb..1a28cb4 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
@@ -23,9 +23,11 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.util.List;
@@ -38,18 +40,21 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
 
     private final SslContext sslCtx;
     private final URL url;
+    private final boolean detectSsl;
     private final List<WireProtocol> protocols;
     private final DefaultChannelGroup channels = new DefaultChannelGroup(
         GlobalEventExecutor.INSTANCE);
 
     public PortUnificationServerHandler(URL url, List<WireProtocol> protocols) {
-        this(url, null, protocols);
+        this(url, null, false, protocols);
     }
 
-    public PortUnificationServerHandler(URL url, SslContext sslCtx, List<WireProtocol> protocols) {
+    public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl,
+        List<WireProtocol> protocols) {
         this.url = url;
         this.sslCtx = sslCtx;
         this.protocols = protocols;
+        this.detectSsl = detectSsl;
     }
 
     @Override
@@ -81,34 +86,54 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
             return;
         }
 
-        for (final WireProtocol protocol : protocols) {
-            in.markReaderIndex();
-            final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
-            in.resetReaderIndex();
-            switch (result) {
-                case UNRECOGNIZED:
-                    continue;
-                case RECOGNIZED:
-                    protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
-                    ctx.pipeline().remove(this);
-                case NEED_MORE_DATA:
-                    return;
-                default:
-                    return;
+        if (isSsl(in)) {
+            enableSsl(ctx);
+        } else {
+            for (final WireProtocol protocol : protocols) {
+                in.markReaderIndex();
+                final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
+                in.resetReaderIndex();
+                switch (result) {
+                    case UNRECOGNIZED:
+                        continue;
+                    case RECOGNIZED:
+                        protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
+                        ctx.pipeline().remove(this);
+                    case NEED_MORE_DATA:
+                        return;
+                    default:
+                        return;
+                }
             }
+            byte[] preface = new byte[in.readableBytes()];
+            in.readBytes(preface);
+            Set<String> supported = url.getApplicationModel()
+                .getExtensionLoader(WireProtocol.class)
+                .getSupportedExtensions();
+            LOGGER.error(String.format("Can not recognize protocol from downstream=%s . "
+                    + "preface=%s protocols=%s", ctx.channel().remoteAddress(),
+                Bytes.bytes2hex(preface),
+                supported));
+
+            // Unknown protocol; discard everything and close the connection.
+            in.clear();
+            ctx.close();
         }
-        byte[] preface = new byte[in.readableBytes()];
-        in.readBytes(preface);
-        Set<String> supported = url.getApplicationModel()
-            .getExtensionLoader(WireProtocol.class)
-            .getSupportedExtensions();
-        LOGGER.error(String.format("Can not recognize protocol from downstream=%s . "
-                + "preface=%s protocols=%s", ctx.channel().remoteAddress(), Bytes.bytes2hex(preface),
-            supported));
-
-        // Unknown protocol; discard everything and close the connection.
-        in.clear();
-        ctx.close();
     }
 
+    private void enableSsl(ChannelHandlerContext ctx) {
+        ChannelPipeline p = ctx.pipeline();
+        p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
+        p.addLast("unificationA", new PortUnificationServerHandler(url, sslCtx, false, protocols));
+        p.remove(this);
+    }
+
+    private boolean isSsl(ByteBuf buf) {
+        if (detectSsl) {
+            return SslHandler.isEncrypted(buf);
+        }
+        return false;
+    }
+
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index 391eabb..3eab0c8 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -384,7 +384,8 @@ public class TripleServerStream extends AbstractStream implements ServerStream {
             Map<String, Object> requestMetadata = headersToMap(headers);
             boolean hasStub = pathResolver.hasNativeStub(path);
             if (hasStub) {
-                listener = new StubAbstractServerCall(invoker, TripleServerStream.this, frameworkModel,
+                listener = new StubAbstractServerCall(invoker, TripleServerStream.this,
+                    frameworkModel,
                     acceptEncoding, serviceName, originalMethodName, executor);
             } else {
                 listener = new ReflectionAbstractServerCall(invoker, TripleServerStream.this,
@@ -407,6 +408,9 @@ public class TripleServerStream extends AbstractStream implements ServerStream {
         }
 
         private void doOnData(ByteBuf data, boolean endStream) {
+            if (deframer == null) {
+                return;
+            }
             deframer.deframe(data);
             if (endStream) {
                 deframer.close();