You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/04/08 11:39:46 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #3997: Issue 3655: Kerberos authentication for proxy

eolivelli commented on a change in pull request #3997: Issue 3655: Kerberos authentication for proxy
URL: https://github.com/apache/pulsar/pull/3997#discussion_r273002218
 
 

 ##########
 File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 ##########
 @@ -182,54 +201,161 @@ public void operationComplete(Future<Void> future) throws Exception {
         }
     }
 
-    /**
-     * handles connect request and sends {@code State.Connected} ack to client
-     */
+    private void completeConnect() {
+        LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
+            remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
+        if (hasProxyToBrokerUrl) {
+            // Client already knows which broker to connect. Let's open a
+            // connection there and just pass bytes in both directions
+            state = State.ProxyConnectionToBroker;
+            directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
+                protocolVersionToAdvertise, sslCtx);
+            cancelKeepAliveTask();
+        } else {
+            // Client is doing a lookup, we can consider the handshake complete
+            // and we'll take care of just topics and
+            // partitions metadata lookups
+            state = State.ProxyLookupRequests;
+            lookupProxyHandler = new LookupProxyHandler(service, this);
+            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
+        }
+    }
+
+    private void createClientAndCompleteConnect(AuthData clientData)
+        throws PulsarClientException {
+        if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+            this.clientAuthData = clientData;
+            this.clientAuthMethod = authMethod;
+        }
+        this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);
+
+        completeConnect();
+    }
+
+    // According to auth result, send newConnected or newAuthChallenge command.
+    private void doAuthentication(AuthData clientData) throws Exception {
+        AuthData brokerData = authState.authenticate(clientData);
+        // authentication has completed, will send newConnected command.
+        if (authState.isComplete()) {
+            clientAuthRole = authState.getAuthRole();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Client successfully authenticated with {} role {}",
+                    remoteAddress, authMethod, clientAuthRole);
+            }
+            createClientAndCompleteConnect(clientData);
+            return;
+        }
+
+        // auth not complete, continue auth with client side.
+        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("[{}] Authentication in progress client by method {}.",
+                remoteAddress, authMethod);
+        }
+        state = State.Connecting;
+        return;
+    }
+
     @Override
     protected void handleConnect(CommandConnect connect) {
         checkArgument(state == State.Init);
-        remoteEndpointProtocolVersion = connect.getProtocolVersion();
+        this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
+        this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
+        this.protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
+        this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null";
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Received CONNECT from {} proxyToBroker={}", remoteAddress,
-                    connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null");
+            LOG.debug("Received CONNECT from {} proxyToBroker={}", remoteAddress, proxyToBrokerUrl);
+            LOG.debug(
+                "[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}",
+                remoteAddress, protocolVersionToAdvertise, remoteEndpointProtocolVersion,
+                Commands.getCurrentProtocolVersion());
         }
 
-        // Client need to do some minimal cooperation logic.
         if (remoteEndpointProtocolVersion < PulsarApi.ProtocolVersion.v10_VALUE) {
             LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress);
             ctx.close();
             return;
         }
 
-        int protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                    "[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}",
-                    remoteAddress, protocolVersionToAdvertise, remoteEndpointProtocolVersion,
-                    Commands.getCurrentProtocolVersion());
-        }
+        try {
+            // init authn
+            this.clientConf = createClientConfiguration();
+            this.clientAuthentication = clientConf.getAuthentication();
+            int protocolVersion = getProtocolVersionToAdvertise(connect);
+
+            // authn not enabled, complete
+            if (!service.getConfiguration().isAuthenticationEnabled()) {
+                this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
+                    new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)));
+
+                completeConnect();
+                return;
+            }
+
+            AuthData clientData = AuthData.of(connect.getAuthData().toByteArray());
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            authenticationProvider = service
+                .getAuthenticationService()
+                .getAuthenticationProvider(authMethod);
+
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".
+            if (authenticationProvider == null) {
+                clientAuthRole = service.getAuthenticationService().getAnonymousUserRole()
+                    .orElseThrow(() ->
+                        new AuthenticationException("No anonymous role, and no authentication provider configured"));
 
-        if (!authenticateAndCreateClient(connect)) {
+                createClientAndCompleteConnect(clientData);
+                return;
+            }
+
+            // init authState and other var
+            ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
+            SSLSession sslSession = null;
+            if (sslHandler != null) {
+                sslSession = ((SslHandler) sslHandler).engine().getSession();
+            }
+
+            authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
+            doAuthentication(clientData);
+        } catch (Exception e) {
+            e.printStackTrace();
 
 Review comment:
   nit: remove printStackTrace

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services