You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/02/15 03:51:12 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] ServerCnx broken after recent cherry-picks (#19521)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 6132b46efae [fix][broker] ServerCnx broken after recent cherry-picks (#19521)
6132b46efae is described below
commit 6132b46efae60d87979966aca075b80ab7e2a87d
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Feb 14 19:35:24 2023 -0600
[fix][broker] ServerCnx broken after recent cherry-picks (#19521)
I broke all release branches when I cherry picked 2847dd19f6e8a546f4d45bf51eb2b72aae0869ce to them. This change takes some of the underlying logic from #19409, without taking the async logic.
* Make changes to `ServerCnx` to make tests pass
Tests are currently failing, so passing tests will show that this solution is correct.
- [x] `doc-not-needed`
(cherry picked from commit 8246da282ca38e891bdf8a4e9abc47f640b22384)
(cherry picked from commit 15e4198e19ebb2045777c696ac39f969b2a57f66)
---
.../apache/pulsar/broker/service/ServerCnx.java | 49 +++++++++++-----------
1 file changed, 24 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index be2386bd369..899bdd49626 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -626,15 +626,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion) {
- if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
- if (!isValidRoleAndOriginalPrincipal()) {
- state = State.Failed;
- service.getPulsarStats().recordConnectionCreateFail();
- final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
- ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
- return;
- }
- }
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
@@ -651,7 +642,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
// According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
+ private void doAuthentication(AuthData clientData,
+ boolean useOriginalAuthState,
int clientProtocolVersion,
String clientVersion) throws Exception {
@@ -659,8 +651,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// in presence of a proxy and if the proxy is forwarding the credentials).
// In this case, the re-validation needs to be done against the original client
// credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
+ AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
AuthData brokerData = authState.authenticate(clientData);
@@ -693,6 +684,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (state != State.Connected) {
// First time authentication is done
+ if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
+ if (!isValidRoleAndOriginalPrincipal()) {
+ state = State.Failed;
+ service.getPulsarStats().recordConnectionCreateFail();
+ final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
+ ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ }
completeConnect(clientProtocolVersion, clientVersion);
} else {
// If the connection was already ready, it means we're doing a refresh
@@ -706,18 +706,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
}
}
+ } else {
- return State.Connected;
- }
-
- // auth not complete, continue auth with client side.
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
- if (log.isDebugEnabled()) {
- log.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
- log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
+ // auth not complete, continue auth with client side.
+ ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authentication in progress client by method {}.",
+ remoteAddress, authMethod);
+ log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
+ }
}
- return State.Connecting;
}
public void refreshAuthenticationCredentials() {
@@ -804,6 +802,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
return;
}
+ state = State.Connecting;
+
try {
byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
AuthData clientData = AuthData.of(authData);
@@ -851,8 +851,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
log.debug("[{}] Authenticate role : {}", remoteAddress, role);
}
- state = doAuthentication(clientData, clientProtocolVersion, clientVersion);
-
// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
@@ -894,6 +892,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
remoteAddress, originalPrincipal);
}
}
+ doAuthentication(clientData, false, clientProtocolVersion, clientVersion);
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
@@ -917,7 +916,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
- doAuthentication(clientData, authResponse.getProtocolVersion(),
+ doAuthentication(clientData, originalAuthState != null, authResponse.getProtocolVersion(),
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();