You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/02/15 03:51:12 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] ServerCnx broken after recent cherry-picks (#19521)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 6132b46efae [fix][broker] ServerCnx broken after recent cherry-picks (#19521)
6132b46efae is described below

commit 6132b46efae60d87979966aca075b80ab7e2a87d
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Feb 14 19:35:24 2023 -0600

    [fix][broker] ServerCnx broken after recent cherry-picks (#19521)
    
    I broke all release branches when I cherry picked 2847dd19f6e8a546f4d45bf51eb2b72aae0869ce to them. This change takes some of the underlying logic from #19409, without taking the async logic.
    
    * Make changes to `ServerCnx` to make tests pass
    
    Tests are currently failing, so passing tests will show that this solution is correct.
    
    - [x] `doc-not-needed`
    
    (cherry picked from commit 8246da282ca38e891bdf8a4e9abc47f640b22384)
    (cherry picked from commit 15e4198e19ebb2045777c696ac39f969b2a57f66)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 49 +++++++++++-----------
 1 file changed, 24 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index be2386bd369..899bdd49626 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -626,15 +626,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     // complete the connect and sent newConnected command
     private void completeConnect(int clientProtoVersion, String clientVersion) {
-        if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
-            if (!isValidRoleAndOriginalPrincipal()) {
-                state = State.Failed;
-                service.getPulsarStats().recordConnectionCreateFail();
-                final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
-                ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
-                return;
-            }
-        }
         ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
@@ -651,7 +642,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
+    private void doAuthentication(AuthData clientData,
+                                   boolean useOriginalAuthState,
                                    int clientProtocolVersion,
                                    String clientVersion) throws Exception {
 
@@ -659,8 +651,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         // in presence of a proxy and if the proxy is forwarding the credentials).
         // In this case, the re-validation needs to be done against the original client
         // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? originalAuthState : this.authState;
+        AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
         String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
         AuthData brokerData = authState.authenticate(clientData);
 
@@ -693,6 +684,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
             if (state != State.Connected) {
                 // First time authentication is done
+                if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
+                    if (!isValidRoleAndOriginalPrincipal()) {
+                        state = State.Failed;
+                        service.getPulsarStats().recordConnectionCreateFail();
+                        final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
+                        ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+                        return;
+                    }
+                }
                 completeConnect(clientProtocolVersion, clientVersion);
             } else {
                 // If the connection was already ready, it means we're doing a refresh
@@ -706,18 +706,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     }
                 }
             }
+        } else {
 
-            return State.Connected;
-        }
-
-        // auth not complete, continue auth with client side.
-        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Authentication in progress client by method {}.",
-                remoteAddress, authMethod);
-            log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
+            // auth not complete, continue auth with client side.
+            ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Authentication in progress client by method {}.",
+                        remoteAddress, authMethod);
+                log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
+            }
         }
-        return State.Connecting;
     }
 
     public void refreshAuthenticationCredentials() {
@@ -804,6 +802,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             return;
         }
 
+        state = State.Connecting;
+
         try {
             byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
             AuthData clientData = AuthData.of(authData);
@@ -851,8 +851,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 log.debug("[{}] Authenticate role : {}", remoteAddress, role);
             }
 
-            state = doAuthentication(clientData, clientProtocolVersion, clientVersion);
-
             // This will fail the check if:
             //  1. client is coming through a proxy
             //  2. we require to validate the original credentials
@@ -894,6 +892,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         remoteAddress, originalPrincipal);
                 }
             }
+            doAuthentication(clientData, false, clientProtocolVersion, clientVersion);
         } catch (Exception e) {
             service.getPulsarStats().recordConnectionCreateFail();
             logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
@@ -917,7 +916,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
-            doAuthentication(clientData, authResponse.getProtocolVersion(),
+            doAuthentication(clientData, originalAuthState != null, authResponse.getProtocolVersion(),
                     authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
         } catch (AuthenticationException e) {
             service.getPulsarStats().recordConnectionCreateFail();