You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/16 16:15:33 UTC

[pulsar] 03/04: Fixed ProxyConnection to check for existence of auth_data field (#12057)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6d3966c477379b36e38bf5aa5fff029be9221ae9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 16 08:04:29 2021 -0700

    Fixed ProxyConnection to check for existence of auth_data field (#12057)
    
    ### Motivation
    
    ProxyConnection is not checking whether the optional auth_data field is set or not.
    
    Additionally, if there's an error before the client instance is created, we should avoid calling close on it.
    
    ```
    2021-09-16T00:49:40,213 [pulsar-proxy-io-2-3] WARN  org.apache.pulsar.proxy.server.ProxyConnection - [/10.199.78.158:7165] Unable to authenticate:
    java.lang.IllegalStateException: Field 'auth_data' is not set
        at org.apache.pulsar.common.api.proto.CommandConnect.getAuthDataSlice(CommandConnect.java:90) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.common.api.proto.CommandConnect.getAuthData(CommandConnect.java:83) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:318) [org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:166) [org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:192) [org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1374) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1248) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1288) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
    2021-09-16T00:49:40,215 [pulsar-proxy-io-2-3] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usua
    java.lang.NullPointerException: null
        at org.apache.pulsar.proxy.server.ProxyConnection.close(ProxyConnection.java:416) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:356) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:166) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:192) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1374) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1248) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1288) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
    ```
---
 .../main/java/org/apache/pulsar/proxy/server/ProxyConnection.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 25f88102..28c6083 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -315,7 +315,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 return;
             }
 
-            AuthData clientData = AuthData.of(connect.getAuthData());
+            AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : null);
             if (connect.hasAuthMethodName()) {
                 authMethod = connect.getAuthMethodName();
             } else if (connect.hasAuthMethod()) {
@@ -413,7 +413,9 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         state = State.Closed;
         ctx.close();
         try {
-            client.close();
+            if (client != null) {
+                client.close();
+            }
         } catch (PulsarClientException e) {
             LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
         }