You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/27 00:44:02 UTC

[GitHub] [pulsar] lin-zhao commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

lin-zhao commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980609442


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);
+    }
+
+    protected final void sendConnectCommand(String originalPrincipal, AuthData originalAuthData,
+                                            String originalAuthMethod) throws Exception {
+        // mutual authentication is to auth between `remoteHostName` and this client for this channel.
+        // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
+        // and return authData to server.
+        authenticationDataProvider = authentication.getAuthData(remoteHostName);

Review Comment:
   nit pick: 
   ```suggestion
           this.authenticationDataProvider = authentication.getAuthData(remoteHostName);
   ```



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
-        super(conf, eventLoopGroup);
+                          Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier,
+                          String clientAuthMethod,
+                          int protocolVersion, boolean forwardClientAuthData) {
+        super(conf, eventLoopGroup, protocolVersion);
         this.clientAuthRole = clientAuthRole;
-        this.clientAuthData = clientAuthData;
+        this.clientAuthDataSupplier = clientAuthDataSupplier;
         this.clientAuthMethod = clientAuthMethod;
-        this.protocolVersion = protocolVersion;
+        this.forwardClientAuthData = forwardClientAuthData;
     }
 
     @Override
-    protected ByteBuf newConnectCommand() throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
-                            + " clientAuthData = {}, clientAuthMethod = {}",
-                    clientAuthRole, clientAuthData, clientAuthMethod);
+    protected void completeActive() {
+        clientAuthDataSupplier.apply(false).thenAccept(clientAuthData -> {
+            try {
+                sendConnectCommand(clientAuthRole, clientAuthData, clientAuthMethod);
+            } catch (Exception e) {
+                log.error("{} Error during handshake", ctx.channel(), e);
+                close(e);
+            }
+        });
+    }
+
+    @Override
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.prepareMutualAuth(authChallenge);
+            return;
         }
 
-        authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        clientAuthDataSupplier.apply(true).thenAccept(originalClientAuthData -> {
+            sendMutualAuthCommand(clientAuthMethod, originalClientAuthData);
+        }).exceptionally(e -> {
+            log.error("{} Error mutual verify", ctx.channel(), e);

Review Comment:
   What's the reason to swallow any exception instead of throwing to the caller? This method declares `AuthenticationException`. Is it supposed to be thrown by this line? If yes you probably don't want to handle and ignore all exceptions here.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -1243,6 +1254,13 @@ public void close() {
        }
     }
 
+    public void close(Throwable e) {

Review Comment:
   Is there a particular reason to make this public? `close` as a method name is very generic.
   
   How about 
   ```suggestion
       private void closeWithException(Throwable e) {
   ```



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,

Review Comment:
   This is a backward incompatible change. Do you want to instead add a new overloaded method instead of changing the existing constructor? Otherwise there's argument this needs to be a major version bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org