You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/03/01 01:50:23 UTC

[GitHub] jiazhai commented on a change in pull request #3677: PIP-30: interface and mutual change authentication

jiazhai commented on a change in pull request #3677: PIP-30: interface and mutual change authentication
URL: https://github.com/apache/pulsar/pull/3677#discussion_r261161262
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
 ##########
 @@ -446,52 +454,98 @@ private String getOriginalPrincipal(String originalAuthData, String originalAuth
         return originalPrincipal;
     }
 
+    // called in handleConnect method below.
+    private void completeConnect(CommandConnect connect) {
+        ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+        state = State.Connected;
+        remoteEndpointProtocolVersion = connect.getProtocolVersion();
+        String version = connect.hasClientVersion() ? connect.getClientVersion() : null;
+        if (isNotBlank(version) && !version.contains(" ") /* ignore default version: pulsar client */) {
+            this.clientVersion = version.intern();
+        }
+    }
+
     @Override
     protected void handleConnect(CommandConnect connect) {
-        checkArgument(state == State.Start);
-        if (service.isAuthenticationEnabled()) {
-            try {
-                String authMethod = "none";
+        checkArgument(state == State.Start  || state == State.Connecting);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Received CONNECT from {}, auth enabled: {}",
+                remoteAddress, service.isAuthenticationEnabled());
+        }
+
+        if (!service.isAuthenticationEnabled()) {
+            completeConnect(connect);
+            return;
+        }
+
+        try {
+            AuthData clientData = AuthData.of(connect.getAuthData().toByteArray());
+
+            // init authentication
+            if (state == State.Start) {
                 if (connect.hasAuthMethodName()) {
                     authMethod = connect.getAuthMethodName();
                 } else if (connect.hasAuthMethod()) {
                     // Legacy client is passing enum
                     authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
+                } else {
+                    authMethod = "none";
+                }
+
+                authenticationProvider = getBrokerService()
+                    .getAuthenticationService()
+                    .getAuthenticationProvider(authMethod);
+
+                // Not find provider named authMethod. Most used for tests.
+                // In AuthenticationDisabled, it will set authMethod "none".
+                if (authenticationProvider == null) {
+                    authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole();
+                    completeConnect(connect);
+                    return;
                 }
 
-                String authData = connect.getAuthData().toStringUtf8();
+                // init authState and other var
                 ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
                 SSLSession sslSession = null;
                 if (sslHandler != null) {
                     sslSession = ((SslHandler) sslHandler).engine().getSession();
                 }
                 originalPrincipal = getOriginalPrincipal(
-                        connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null,
-                        connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
-                        connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
-                        sslSession);
-                authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
-                authRole = getBrokerService().getAuthenticationService()
-                        .authenticate(authenticationData, authMethod);
-
-                log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal);
-            } catch (AuthenticationException e) {
-                String msg = "Unable to authenticate";
-                log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
-                ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
-                close();
+                    connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null,
+                    connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
+                    connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
+                    sslSession);
+
+                authenticationData = authenticationProvider.getAuthDataSource(clientData, remoteAddress, sslSession);
+                authState = authenticationProvider.newAuthState(authenticationData);
+            }
+
+            AuthData brokerData = authState.authenticate(clientData);
 
 Review comment:
   Thanks, @ivankelly ,will add isComplete() both in AuthState and AuthData.
   In AuthState it determine the auth complete or not. In AuthData, it determine whether need send response back.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services