You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/03/13 02:14:24 UTC

[pulsar] branch master updated: PIP-30: interface for mutual authentication (#3677)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09e3ed8  PIP-30: interface for mutual authentication (#3677)
09e3ed8 is described below

commit 09e3ed8aa15579d2fb265ce039b8f8bdb5b9f59f
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Mar 13 10:14:17 2019 +0800

    PIP-30: interface for mutual authentication (#3677)
    
    This is to implement the mutual auth api discussed in "PIP-30: change authentication provider API to support mutual authentication"
    Mainly provide 2 new command CommandAuthResponse and  CommandAuthChallenge in proto, to support it.
---
 .../authentication/AuthenticationDataSource.java   |   28 +-
 .../authentication/AuthenticationProvider.java     |   23 +-
 .../authentication/AuthenticationService.java      |   13 +
 .../broker/authentication/AuthenticationState.java |   53 +
 .../OneStageAuthenticationState.java               |   68 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  160 +-
 .../pulsar/broker/service/ServerCnxTest.java       |   27 +-
 .../client/api/MutualAuthenticationTest.java       |  240 +++
 .../apache/pulsar/client/api/Authentication.java   |   20 +-
 .../client/api/AuthenticationDataProvider.java     |   32 +-
 .../org/apache/pulsar/common/api/AuthData.java     |   34 +
 pulsar-client-cpp/lib/Commands.cc                  |    6 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   61 +-
 .../org/apache/pulsar/common/api/Commands.java     |   82 +
 .../apache/pulsar/common/api/PulsarDecoder.java    |   22 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 1590 ++++++++++++++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |   26 +
 17 files changed, 2411 insertions(+), 74 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
index 9fc6cbe..fcc6dda 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.broker.authentication;
 
+import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.cert.Certificate;
+import org.apache.pulsar.common.api.AuthData;
 
 /**
  * Interface for accessing data which are used in variety of authentication schemes on server side
@@ -31,7 +33,7 @@ public interface AuthenticationDataSource {
 
     /**
      * Check if data from TLS are available.
-     * 
+     *
      * @return true if this authentication data contain data from TLS
      */
     default boolean hasDataFromTls() {
@@ -39,7 +41,7 @@ public interface AuthenticationDataSource {
     }
 
     /**
-     * 
+     *
      * @return a client certificate chain, or null if the data are not available
      */
     default Certificate[] getTlsCertificates() {
@@ -52,7 +54,7 @@ public interface AuthenticationDataSource {
 
     /**
      * Check if data from HTTP are available.
-     * 
+     *
      * @return true if this authentication data contain data from HTTP
      */
     default boolean hasDataFromHttp() {
@@ -60,7 +62,7 @@ public interface AuthenticationDataSource {
     }
 
     /**
-     * 
+     *
      * @return a authentication scheme, or <code>null<c/ode> if the request is not be authenticated
      */
     default String getHttpAuthType() {
@@ -68,7 +70,7 @@ public interface AuthenticationDataSource {
     }
 
     /**
-     * 
+     *
      * @return a <code>String</code> containing the value of the specified header, or <code>null</code> if the header
      *         does not exist.
      */
@@ -82,7 +84,7 @@ public interface AuthenticationDataSource {
 
     /**
      * Check if data from Pulsar protocol are available.
-     * 
+     *
      * @return true if this authentication data contain data from Pulsar protocol
      */
     default boolean hasDataFromCommand() {
@@ -90,20 +92,28 @@ public interface AuthenticationDataSource {
     }
 
     /**
-     * 
+     *
      * @return authentication data which is stored in a command
      */
     default String getCommandData() {
         return null;
     }
 
+    /**
+     * Evaluate and challenge the data that passed in, and return processed data back.
+     * It is used for mutual authentication like SASL.
+     */
+    default AuthData authenticate(AuthData data) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
     /*
      * Peer
      */
 
     /**
      * Check if data from peer are available.
-     * 
+     *
      * @return true if this authentication data contain data from peer
      */
     default boolean hasDataFromPeer() {
@@ -111,7 +121,7 @@ public interface AuthenticationDataSource {
     }
 
     /**
-     * 
+     *
      * @return a <code>String</code> containing the IP address of the client
      */
     default SocketAddress getPeerAddress() {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
index 6b957ba..755fe86 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
@@ -21,9 +21,12 @@ package org.apache.pulsar.broker.authentication;
 import java.io.Closeable;
 import java.io.IOException;
 
+import java.net.SocketAddress;
 import javax.naming.AuthenticationException;
 
+import javax.net.ssl.SSLSession;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.AuthData;
 
 /**
  * Provider of authentication mechanism
@@ -46,14 +49,28 @@ public interface AuthenticationProvider extends Closeable {
     String getAuthMethodName();
 
     /**
-     * Validate the authentication for the given credentials with the specified authentication data
-     * 
+     * Validate the authentication for the given credentials with the specified authentication data.
+     * This method is useful in one stage authn, if you're not doing one stage or if you're providing
+     * your own state implementation for one stage authn, it should throw an exception.
+     *
      * @param authData
      *            provider specific authentication data
      * @return the "role" string for the authenticated connection, if the authentication was successful
      * @throws AuthenticationException
      *             if the credentials are not valid
      */
-    String authenticate(AuthenticationDataSource authData) throws AuthenticationException;
+    default String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+        throw new AuthenticationException("Not supported");
+    }
+
+    /**
+     * Create an authentication data State use passed in AuthenticationDataSource.
+     */
+    default AuthenticationState newAuthState(AuthData authData,
+                                             SocketAddress remoteAddress,
+                                             SSLSession sslSession)
+        throws AuthenticationException{
+        return new OneStageAuthenticationState(authData, remoteAddress, sslSession, this);
+    }
 
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
index 930f3d2..2822517 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 
+import java.util.Optional;
 import javax.naming.AuthenticationException;
 import javax.servlet.http.HttpServletRequest;
 
@@ -105,6 +106,18 @@ public class AuthenticationService implements Closeable {
         }
     }
 
+    public AuthenticationProvider getAuthenticationProvider(String authMethodName) {
+        return providers.get(authMethodName);
+    }
+
+    // called when authn enabled, but no authentication provided
+    public Optional<String> getAnonymousUserRole() {
+        if (StringUtils.isNotBlank(anonymousUserRole)) {
+            return Optional.of(anonymousUserRole);
+        }
+        return Optional.empty();
+    }
+
     @Override
     public void close() throws IOException {
         for (AuthenticationProvider provider : providers.values()) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
new file mode 100644
index 0000000..4248b6b
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.common.api.AuthData;
+
+/**
+ * Interface for authentication state.
+ *
+ * It tell broker whether the authentication is completed or not,
+ * if completed, what is the AuthRole is.
+ */
+public interface AuthenticationState {
+    /**
+     * After the authentication between client and broker completed,
+     * get authentication role represent for the client.
+     * It should throw exception if auth not complete.
+     */
+    String getAuthRole() throws AuthenticationException;
+
+    /**
+     * Challenge passed in auth data and get response data.
+     */
+    AuthData authenticate(AuthData authData) throws AuthenticationException;
+
+    /**
+     * Return AuthenticationDataSource.
+     */
+    AuthenticationDataSource getAuthDataSource();
+
+    /**
+     * Whether the authentication is completed or not
+     */
+    boolean isComplete();
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
new file mode 100644
index 0000000..06b1749
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import java.net.SocketAddress;
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import org.apache.pulsar.common.api.AuthData;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Interface for authentication state.
+ *
+ * It tell broker whether the authentication is completed or not,
+ * if completed, what is the AuthRole is.
+ */
+public class OneStageAuthenticationState implements AuthenticationState {
+
+    private final AuthenticationDataSource authenticationDataSource;
+    private final String authRole;
+
+    public OneStageAuthenticationState(AuthData authData,
+                                       SocketAddress remoteAddress,
+                                       SSLSession sslSession,
+                                       AuthenticationProvider provider) throws AuthenticationException {
+        this.authenticationDataSource = new AuthenticationDataCommand(
+            new String(authData.getBytes(), UTF_8), remoteAddress, sslSession);;
+        this.authRole = provider.authenticate(authenticationDataSource);
+    }
+
+    @Override
+    public String getAuthRole() {
+        return authRole;
+    }
+
+    @Override
+    public AuthenticationDataSource getAuthDataSource() {
+        return authenticationDataSource;
+    }
+
+    @Override
+    public AuthData authenticate(AuthData authData) {
+        return null;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return true;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 69db016..bcd739f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -53,6 +53,8 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -63,11 +65,13 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.CommandUtils;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
@@ -115,6 +119,8 @@ public class ServerCnx extends PulsarHandler {
     private volatile boolean isActive = true;
     String authRole = null;
     AuthenticationDataSource authenticationData;
+    AuthenticationProvider authenticationProvider;
+    AuthenticationState authState;
 
     // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
     // control done by a single producer might not be enough to prevent write spikes on the broker.
@@ -129,9 +135,10 @@ public class ServerCnx extends PulsarHandler {
     private Set<String> proxyRoles;
     private boolean authenticateOriginalAuthData;
     private final boolean schemaValidationEnforced;
+    private String authMethod = "none";
 
     enum State {
-        Start, Connected, Failed
+        Start, Connected, Failed, Connecting
     }
 
     public ServerCnx(PulsarService pulsar) {
@@ -253,7 +260,7 @@ public class ServerCnx extends PulsarHandler {
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) {
                 isProxyAuthorizedFuture = service.getAuthorizationService().canLookupAsync(topicName, authRole,
-                        authenticationData);
+                    authenticationData);
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
@@ -446,52 +453,127 @@ public class ServerCnx extends PulsarHandler {
         return originalPrincipal;
     }
 
+    // complete the connect and sent newConnected command
+    private void completeConnect(int clientProtoVersion, String clientVersion) {
+        ctx.writeAndFlush(Commands.newConnected(clientProtoVersion));
+        state = State.Connected;
+        remoteEndpointProtocolVersion = clientProtoVersion;
+        if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
+            this.clientVersion = clientVersion.intern();
+        }
+    }
+
+    // According to auth result, send newConnected or newAuthChallenge command.
+    private void doAuthentication(AuthData clientData,
+                                  int clientProtocolVersion,
+                                  String clientVersion) throws Exception {
+        AuthData brokerData = authState.authenticate(clientData);
+        // authentication has completed, will send newConnected command.
+        if (authState.isComplete()) {
+            authRole = authState.getAuthRole();
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
+                    remoteAddress, authMethod, authRole, originalPrincipal);
+            }
+            completeConnect(clientProtocolVersion, clientVersion);
+            return;
+        }
+
+        // auth not complete, continue auth with client side.
+        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Authentication in progress client by method {}.",
+                remoteAddress, authMethod);
+        }
+        state = State.Connecting;
+        return;
+    }
+
     @Override
     protected void handleConnect(CommandConnect connect) {
         checkArgument(state == State.Start);
-        if (service.isAuthenticationEnabled()) {
-            try {
-                String authMethod = "none";
-                if (connect.hasAuthMethodName()) {
-                    authMethod = connect.getAuthMethodName();
-                } else if (connect.hasAuthMethod()) {
-                    // Legacy client is passing enum
-                    authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
-                }
 
-                String authData = connect.getAuthData().toStringUtf8();
-                ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
-                SSLSession sslSession = null;
-                if (sslHandler != null) {
-                    sslSession = ((SslHandler) sslHandler).engine().getSession();
-                }
-                originalPrincipal = getOriginalPrincipal(
-                        connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null,
-                        connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
-                        connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
-                        sslSession);
-                authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
-                authRole = getBrokerService().getAuthenticationService()
-                        .authenticate(authenticationData, authMethod);
-
-                log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal);
-            } catch (AuthenticationException e) {
-                String msg = "Unable to authenticate";
-                log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
-                ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
-                close();
+        if (log.isDebugEnabled()) {
+            log.debug("Received CONNECT from {}, auth enabled: {}",
+                remoteAddress, service.isAuthenticationEnabled());
+        }
+
+        String clientVersion = connect.getClientVersion();
+        int clientProtocolVersion = connect.getProtocolVersion();
+
+        if (!service.isAuthenticationEnabled()) {
+            completeConnect(clientProtocolVersion, clientVersion);
+            return;
+        }
+
+        try {
+            AuthData clientData = AuthData.of(connect.getAuthData().toByteArray());
+
+            // init authentication
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            authenticationProvider = getBrokerService()
+                .getAuthenticationService()
+                .getAuthenticationProvider(authMethod);
+
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".
+            if (authenticationProvider == null) {
+                authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole()
+                    .orElseThrow(() ->
+                        new AuthenticationException("No anonymous role, and no authentication provider configured"));
+                completeConnect(clientProtocolVersion, clientVersion);
                 return;
             }
+
+            // init authState and other var
+            ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
+            SSLSession sslSession = null;
+            if (sslHandler != null) {
+                sslSession = ((SslHandler) sslHandler).engine().getSession();
+            }
+            originalPrincipal = getOriginalPrincipal(
+                connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null,
+                connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
+                connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
+                sslSession);
+
+            authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
+            doAuthentication(clientData, clientProtocolVersion, clientVersion);
+        } catch (Exception e) {
+            String msg = "Unable to authenticate";
+            log.warn("[{}] {} ", remoteAddress, msg, e);
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
+            close();
         }
+    }
+
+    @Override
+    protected void handleAuthResponse(CommandAuthResponse authResponse) {
+        checkArgument(state == State.Connecting);
+        checkArgument(authResponse.hasResponse());
+        checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
+
         if (log.isDebugEnabled()) {
-            log.debug("Received CONNECT from {}", remoteAddress);
+            log.debug("Received AuthResponse from {}, auth method: {}",
+                remoteAddress, authResponse.getResponse().getAuthMethodName());
         }
-        ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
-        state = State.Connected;
-        remoteEndpointProtocolVersion = connect.getProtocolVersion();
-        String version = connect.hasClientVersion() ? connect.getClientVersion() : null;
-        if (isNotBlank(version) && !version.contains(" ") /* ignore default version: pulsar client */) {
-            this.clientVersion = version.intern();
+
+        try {
+            AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData().toByteArray());
+            doAuthentication(clientData, authResponse.getProtocolVersion(), authResponse.getClientVersion());
+        } catch (Exception e) {
+            String msg = "Unable to handleAuthResponse";
+            log.warn("[{}] {} ", remoteAddress, msg, e);
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
+            close();
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 545817f..b98cd72 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -67,8 +67,10 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -78,6 +80,7 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
@@ -334,9 +337,23 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testConnectCommandWithAuthenticationPositive() throws Exception {
         AuthenticationService authenticationService = mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = mock(AuthenticationProvider.class);
+        AuthenticationState authenticationState = mock(AuthenticationState.class);
+        AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
+        AuthData authData = AuthData.of(null);
+
         doReturn(authenticationService).when(brokerService).getAuthenticationService();
-        doReturn("appid1").when(authenticationService).authenticate(new AuthenticationDataCommand(Mockito.anyString()),
-                Mockito.anyString());
+        doReturn(authenticationProvider).when(authenticationService).getAuthenticationProvider(Mockito.anyString());
+        doReturn(authenticationState).when(authenticationProvider)
+            .newAuthState(Mockito.anyObject(), Mockito.anyObject(), Mockito.anyObject());
+        doReturn(authData).when(authenticationState)
+            .authenticate(authData);
+        doReturn(true).when(authenticationState)
+            .isComplete();
+
+        doReturn("appid1").when(authenticationState)
+            .getAuthRole();
+
         doReturn(true).when(brokerService).isAuthenticationEnabled();
 
         resetChannel();
@@ -354,11 +371,9 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testConnectCommandWithAuthenticationNegative() throws Exception {
-        AuthenticationException e = new AuthenticationException();
         AuthenticationService authenticationService = mock(AuthenticationService.class);
         doReturn(authenticationService).when(brokerService).getAuthenticationService();
-        doThrow(e).when(authenticationService).authenticate(new AuthenticationDataCommand(Mockito.anyString()),
-                Mockito.anyString());
+        doReturn(Optional.empty()).when(authenticationService).getAnonymousUserRole();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
 
         resetChannel();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
new file mode 100644
index 0000000..b247622
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Test Mutual Authentication.
+ * Test connect set success, and producer consumer works well.
+ */
+public class MutualAuthenticationTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(MutualAuthenticationTest.class);
+
+    private MutualAuthentication mutualAuth;
+
+    private static String[] clientAuthStrings = {
+        "MutualClientAuthInit", // step 0
+        "MutualClientStep1"     // step 1
+    };
+
+    private static String[] serverAuthStrings = {
+        "ResponseMutualClientAuthInit", // step 0
+    };
+
+    public static class MutualAuthenticationDataProvider implements AuthenticationDataProvider {
+        @Override
+        public boolean hasDataFromCommand() {
+            return true;
+        }
+
+        @Override
+        public AuthData authenticate(AuthData data) throws AuthenticationException {
+            String dataString = new String(data.getBytes(), UTF_8);
+            AuthData toSend;
+
+            if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA)) {
+                toSend = AuthData.of(clientAuthStrings[0].getBytes(UTF_8));
+            } else if (Arrays.equals(dataString.getBytes(), serverAuthStrings[0].getBytes(UTF_8))) {
+                toSend = AuthData.of(clientAuthStrings[1].getBytes(UTF_8));
+            } else {
+                throw new AuthenticationException();
+            }
+
+            log.debug("authenticate in client. passed in :{}, send: {}",
+                dataString, new String(toSend.getBytes(), UTF_8));
+            return toSend;
+        }
+    }
+
+    public static class MutualAuthentication implements Authentication {
+        @Override
+        public void close() throws IOException {
+            // noop
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "MutualAuthentication";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData(String broker) throws PulsarClientException {
+            try {
+                return new MutualAuthenticationDataProvider();
+            } catch (Exception e) {
+                throw new PulsarClientException(e);
+            }
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // noop
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            // noop
+        }
+    }
+
+
+    public static class MutualAuthenticationState implements AuthenticationState {
+        private boolean isComplete = false;
+
+        @Override
+        public String getAuthRole() throws AuthenticationException {
+            return "admin";
+        }
+
+        @Override
+        public AuthData authenticate(AuthData authData) throws AuthenticationException {
+            String dataString = new String(authData.getBytes(), UTF_8);
+            AuthData toSend;
+
+            if (Arrays.equals(dataString.getBytes(), clientAuthStrings[0].getBytes(UTF_8))) {
+                toSend = AuthData.of(serverAuthStrings[0].getBytes(UTF_8));
+            } else if (Arrays.equals(dataString.getBytes(), clientAuthStrings[1].getBytes(UTF_8))) {
+                isComplete = true;
+                toSend = AuthData.of(null);
+            } else {
+                throw new AuthenticationException();
+            }
+
+            log.debug("authenticate in server. passed in :{}, send: {}",
+                dataString, toSend.getBytes() == null ? "null" : new String(toSend.getBytes(), UTF_8));
+            return toSend;
+        }
+
+        @Override
+        public AuthenticationDataSource getAuthDataSource() {
+            return null;
+        }
+
+        @Override
+        public boolean isComplete() {
+            return isComplete;
+        }
+    }
+
+    public static class MutualAuthenticationProvider implements AuthenticationProvider {
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration config) throws IOException {
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "MutualAuthentication";
+        }
+
+        @Override
+        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+            return "admin";
+        }
+
+        @Override
+        public AuthenticationState newAuthState(AuthData authData,
+                                                SocketAddress remoteAddress,
+                                                SSLSession sslSession) {
+            return new MutualAuthenticationState();
+        }
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        mutualAuth = new MutualAuthentication();
+        Set<String> superUserRoles = new HashSet<String>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        Set<String> providersClassNames = Sets.newHashSet(MutualAuthenticationProvider.class.getName());
+        conf.setAuthenticationProviders(providersClassNames);
+
+        super.init();
+        URI brokerServiceUrl = new URI("pulsar://localhost:" + BROKER_PORT);
+        pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString())
+            .authentication(mutualAuth)
+            .build();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testAuthentication() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
+            .subscriptionName("my-subscriber-name")
+            .subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+            .topic("persistent://my-property/my-ns/my-topic1")
+            .create();
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        consumer.acknowledgeCumulative(msg);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
index b3a2172..8cb4407 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.client.api;
 import java.io.Closeable;
 import java.io.Serializable;
 import java.util.Map;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 
 /**
  * Interface of authentication providers.
@@ -40,7 +42,23 @@ public interface Authentication extends Closeable, Serializable {
      * @throws PulsarClientException
      *             any other error
      */
-    AuthenticationDataProvider getAuthData() throws PulsarClientException;
+    default AuthenticationDataProvider getAuthData() throws PulsarClientException {
+        throw new UnsupportedAuthenticationException("Method not implemented!");
+    }
+
+    /**
+     *
+     * Get/Create an authentication data provider which provides the data that this client will be sent to the broker.
+     * Some authentication method need to auth between each client channel. So it need the broker, who it will talk to.
+     *
+     * @param brokerHostName
+     *          target broker host name
+     *
+     * @return The authentication data provider
+     */
+    default AuthenticationDataProvider getAuthData(String brokerHostName) throws PulsarClientException {
+        return this.getAuthData();
+    }
 
     /**
      * Configure the authentication plugins with the supplied parameters
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index e841d4e..ecf3d3f 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -18,11 +18,16 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.security.PrivateKey;
 import java.security.cert.Certificate;
 import java.util.Map;
 import java.util.Set;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.common.api.AuthData;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
  * Interface for accessing data which are used in variety of authentication schemes on client side
@@ -34,7 +39,7 @@ public interface AuthenticationDataProvider extends Serializable {
 
     /**
      * Check if data for TLS are available.
-     * 
+     *
      * @return true if this authentication data contain data for TLS
      */
     default boolean hasDataForTls() {
@@ -42,7 +47,7 @@ public interface AuthenticationDataProvider extends Serializable {
     }
 
     /**
-     * 
+     *
      * @return a client certificate chain, or null if the data are not available
      */
     default Certificate[] getTlsCertificates() {
@@ -50,7 +55,7 @@ public interface AuthenticationDataProvider extends Serializable {
     }
 
     /**
-     * 
+     *
      * @return a private key for the client certificate, or null if the data are not available
      */
     default PrivateKey getTlsPrivateKey() {
@@ -63,7 +68,7 @@ public interface AuthenticationDataProvider extends Serializable {
 
     /**
      * Check if data for HTTP are available.
-     * 
+     *
      * @return true if this authentication data contain data for HTTP
      */
     default boolean hasDataForHttp() {
@@ -71,7 +76,7 @@ public interface AuthenticationDataProvider extends Serializable {
     }
 
     /**
-     * 
+     *
      * @return a authentication scheme, or <code>null<c/ode> if the request will not be authenticated
      */
     default String getHttpAuthType() {
@@ -79,7 +84,7 @@ public interface AuthenticationDataProvider extends Serializable {
     }
 
     /**
-     * 
+     *
      * @return an enumeration of all the header names
      */
     default Set<Map.Entry<String, String>> getHttpHeaders() {
@@ -92,7 +97,7 @@ public interface AuthenticationDataProvider extends Serializable {
 
     /**
      * Check if data from Pulsar protocol are available.
-     * 
+     *
      * @return true if this authentication data contain data from Pulsar protocol
      */
     default boolean hasDataFromCommand() {
@@ -100,11 +105,22 @@ public interface AuthenticationDataProvider extends Serializable {
     }
 
     /**
-     * 
+     *
      * @return authentication data which will be stored in a command
      */
     default String getCommandData() {
         return null;
     }
 
+    /**
+     * For mutual authentication, This method use passed in `data` to evaluate and challenge,
+     * then returns null if authentication has completed;
+     * returns authenticated data back to server side, if authentication has not completed.
+     *
+     * Mainly used for mutual authentication like sasl.
+     */
+    default AuthData authenticate(AuthData data) throws IOException, AuthenticationException {
+        byte[] bytes = (hasDataFromCommand() ? this.getCommandData() : "").getBytes(UTF_8);
+        return AuthData.of(bytes);
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
new file mode 100644
index 0000000..ccf1e7f
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api;
+
+import lombok.Data;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@Data(staticConstructor="of")
+public final class AuthData {
+    public static byte[] INIT_AUTH_DATA = "PulsarAuthInit".getBytes(UTF_8);
+
+    private final byte[] bytes;
+
+    public boolean isComplete() {
+        return bytes == null;
+    }
+}
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 726939f..06b96f9 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -515,6 +515,12 @@ std::string Commands::messageType(BaseCommand_Type type) {
         case BaseCommand::GET_SCHEMA_RESPONSE:
             return "GET_SCHEMA_RESPONSE";
             break;
+        case BaseCommand::AUTH_CHALLENGE:
+            return "AUTH_CHALLENGE";
+            break;
+        case BaseCommand::AUTH_RESPONSE:
+            return "AUTH_RESPONSE";
+            break;
     };
 }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bb8b98d..5f2dae0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
@@ -39,13 +41,16 @@ import javax.net.ssl.SSLSession;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
@@ -125,8 +130,11 @@ public class ClientCnx extends PulsarHandler {
 
     private final ScheduledFuture<?> timeoutTask;
 
+    // Added for mutual authentication.
+    private AuthenticationDataProvider authenticationDataProvider;
+
     enum State {
-        None, SentConnectFrame, Ready, Failed
+        None, SentConnectFrame, Ready, Failed, Connecting
     }
 
     static class RequestTime {
@@ -187,13 +195,14 @@ public class ClientCnx extends PulsarHandler {
                 });
     }
 
-    protected ByteBuf newConnectCommand() throws PulsarClientException {
-        String authData = "";
-        if (authentication.getAuthData().hasDataFromCommand()) {
-            authData = authentication.getAuthData().getCommandData();
-        }
+    protected ByteBuf newConnectCommand() throws Exception {
+        // mutual authentication is to auth between `remoteHostName` and this client for this channel.
+        // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
+        // and return authData to server.
+        authenticationDataProvider = authentication.getAuthData(remoteHostName);
+        AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
         return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-                getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null);
+            getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null);
     }
 
     @Override
@@ -265,7 +274,7 @@ public class ClientCnx extends PulsarHandler {
             return;
         }
 
-        checkArgument(state == State.SentConnectFrame);
+        checkArgument(state == State.SentConnectFrame || state == State.Connecting);
 
         if (log.isDebugEnabled()) {
             log.debug("{} Connection is ready", ctx.channel());
@@ -277,6 +286,42 @@ public class ClientCnx extends PulsarHandler {
     }
 
     @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData());
+
+        // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
+        try {
+            AuthData authData = authenticationDataProvider
+                .authenticate(AuthData.of(authChallenge.getChallenge().getAuthData().toByteArray()));
+
+            checkState(!authData.isComplete());
+
+            ByteBuf request = Commands.newAuthResponse(authentication.getAuthMethodName(),
+                authData,
+                this.protocolVersion,
+                getPulsarClientVersion());
+
+            if (log.isDebugEnabled()) {
+                log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
+            }
+
+            ctx.writeAndFlush(request).addListener(writeFuture -> {
+                if (!writeFuture.isSuccess()) {
+                    log.warn("{} Failed to send request for mutual auth to broker: {}", ctx.channel(),
+                        writeFuture.cause().getMessage());
+                    connectionFuture.completeExceptionally(writeFuture.cause());
+                }
+            });
+            state = State.Connecting;
+        } catch (Exception e) {
+            log.error("{} Error mutual verify: {}", ctx.channel(), e);
+            connectionFuture.completeExceptionally(e);
+            return;
+        }
+    }
+
+    @Override
     protected void handleSendReceipt(CommandSendReceipt sendReceipt) {
         checkArgument(state == State.Ready);
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b047b56..afaec8c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -45,6 +45,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
@@ -150,6 +152,41 @@ public class Commands {
         return res;
     }
 
+    public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion,
+                                     String targetBroker, String originalPrincipal, String originalAuthData,
+                                     String originalAuthMethod) {
+        CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
+        connectBuilder.setClientVersion(libVersion != null ? libVersion : "Pulsar Client");
+        connectBuilder.setAuthMethodName(authMethodName);
+
+        if (targetBroker != null) {
+            // When connecting through a proxy, we need to specify which broker do we want to be proxied through
+            connectBuilder.setProxyToBrokerUrl(targetBroker);
+        }
+
+        if (authData != null) {
+            connectBuilder.setAuthData(ByteString.copyFrom(authData.getBytes()));
+        }
+
+        if (originalPrincipal != null) {
+            connectBuilder.setOriginalPrincipal(originalPrincipal);
+        }
+
+        if (originalAuthData != null) {
+            connectBuilder.setOriginalAuthData(originalAuthData);
+        }
+
+        if (originalAuthMethod != null) {
+            connectBuilder.setOriginalAuthMethod(originalAuthMethod);
+        }
+        connectBuilder.setProtocolVersion(protocolVersion);
+        CommandConnect connect = connectBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect));
+        connect.recycle();
+        connectBuilder.recycle();
+        return res;
+    }
+
     public static ByteBuf newConnected(int clientProtocolVersion) {
         CommandConnected.Builder connectedBuilder = CommandConnected.newBuilder();
         connectedBuilder.setServerVersion("Pulsar Server");
@@ -168,6 +205,51 @@ public class Commands {
         return res;
     }
 
+    public static ByteBuf newAuthChallenge(String authMethod, AuthData brokerData, int clientProtocolVersion) {
+        CommandAuthChallenge.Builder challengeBuilder = CommandAuthChallenge.newBuilder();
+
+        // If the broker supports a newer version of the protocol, it will anyway advertise the max version that the
+        // client supports, to avoid confusing the client.
+        int currentProtocolVersion = getCurrentProtocolVersion();
+        int versionToAdvertise = Math.min(currentProtocolVersion, clientProtocolVersion);
+
+        challengeBuilder.setProtocolVersion(versionToAdvertise);
+
+        CommandAuthChallenge challenge = challengeBuilder
+            .setChallenge(PulsarApi.AuthData.newBuilder()
+                .setAuthData(copyFrom(brokerData.getBytes()))
+                .setAuthMethodName(authMethod)
+                .build())
+            .build();
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.AUTH_CHALLENGE).setAuthChallenge(challenge));
+        challenge.recycle();
+        challengeBuilder.recycle();
+        return res;
+    }
+
+    public static ByteBuf newAuthResponse(String authMethod,
+                                           AuthData clientData,
+                                           int clientProtocolVersion,
+                                           String clientVersion) {
+        CommandAuthResponse.Builder responseBuilder  = CommandAuthResponse.newBuilder();
+
+        responseBuilder.setClientVersion(clientVersion != null ? clientVersion : "Pulsar Client");
+        responseBuilder.setProtocolVersion(clientProtocolVersion);
+
+        CommandAuthResponse response = responseBuilder
+            .setResponse(PulsarApi.AuthData.newBuilder()
+                .setAuthData(copyFrom(clientData.getBytes()))
+                .setAuthMethodName(authMethod)
+                .build())
+            .build();
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.AUTH_RESPONSE).setAuthResponse(response));
+        response.recycle();
+        responseBuilder.recycle();
+        return res;
+    }
+
     public static ByteBuf newSuccess(long requestId) {
         CommandSuccess.Builder successBuilder = CommandSuccess.newBuilder();
         successBuilder.setRequestId(requestId);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index f8c2d61..0e1ea73 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -28,6 +28,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
@@ -303,6 +305,18 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
                 handleGetSchemaResponse(cmd.getGetSchemaResponse());
                 cmd.getGetSchemaResponse().recycle();
                 break;
+
+            case AUTH_CHALLENGE:
+                checkArgument(cmd.hasAuthChallenge());
+                handleAuthChallenge(cmd.getAuthChallenge());
+                cmd.getAuthChallenge().recycle();
+                break;
+
+            case AUTH_RESPONSE:
+                checkArgument(cmd.hasAuthResponse());
+                handleAuthResponse(cmd.getAuthResponse());
+                cmd.getAuthResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -454,5 +468,13 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleAuthResponse(CommandAuthResponse commandAuthResponse) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleAuthChallenge(CommandAuthChallenge commandAuthChallenge) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 2940a99..57dd32f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -207,6 +207,7 @@ public final class PulsarApi {
     v11(11, 11),
     v12(12, 12),
     v13(13, 13),
+    v14(14, 14),
     ;
     
     public static final int v0_VALUE = 0;
@@ -223,6 +224,7 @@ public final class PulsarApi {
     public static final int v11_VALUE = 11;
     public static final int v12_VALUE = 12;
     public static final int v13_VALUE = 13;
+    public static final int v14_VALUE = 14;
     
     
     public final int getNumber() { return value; }
@@ -243,6 +245,7 @@ public final class PulsarApi {
         case 11: return v11;
         case 12: return v12;
         case 13: return v13;
+        case 14: return v14;
         default: return null;
       }
     }
@@ -6786,6 +6789,1419 @@ public final class PulsarApi {
     // @@protoc_insertion_point(class_scope:pulsar.proto.CommandConnected)
   }
   
+  public interface CommandAuthResponseOrBuilder
+      extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // optional string client_version = 1;
+    boolean hasClientVersion();
+    String getClientVersion();
+    
+    // optional .pulsar.proto.AuthData response = 2;
+    boolean hasResponse();
+    org.apache.pulsar.common.api.proto.PulsarApi.AuthData getResponse();
+    
+    // optional int32 protocol_version = 3 [default = 0];
+    boolean hasProtocolVersion();
+    int getProtocolVersion();
+  }
+  public static final class CommandAuthResponse extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements CommandAuthResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use CommandAuthResponse.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private CommandAuthResponse(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandAuthResponse> RECYCLER = new io.netty.util.Recycler<CommandAuthResponse>() {
+            protected CommandAuthResponse newObject(Handle handle) {
+              return new CommandAuthResponse(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private CommandAuthResponse(boolean noInit) {}
+    
+    private static final CommandAuthResponse defaultInstance;
+    public static CommandAuthResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandAuthResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // optional string client_version = 1;
+    public static final int CLIENT_VERSION_FIELD_NUMBER = 1;
+    private java.lang.Object clientVersion_;
+    public boolean hasClientVersion() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getClientVersion() {
+      java.lang.Object ref = clientVersion_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          clientVersion_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getClientVersionBytes() {
+      java.lang.Object ref = clientVersion_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+        clientVersion_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+      }
+    }
+    
+    // optional .pulsar.proto.AuthData response = 2;
+    public static final int RESPONSE_FIELD_NUMBER = 2;
+    private org.apache.pulsar.common.api.proto.PulsarApi.AuthData response_;
+    public boolean hasResponse() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.AuthData getResponse() {
+      return response_;
+    }
+    
+    // optional int32 protocol_version = 3 [default = 0];
+    public static final int PROTOCOL_VERSION_FIELD_NUMBER = 3;
+    private int protocolVersion_;
+    public boolean hasProtocolVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public int getProtocolVersion() {
+      return protocolVersion_;
+    }
+    
+    private void initFields() {
+      clientVersion_ = "";
+      response_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+      protocolVersion_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getClientVersionBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(2, response_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, protocolVersion_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(1, getClientVersionBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(2, response_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(3, protocolVersion_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(byte[] data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(
+        byte[] data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseDelimitedFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        clientVersion_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        response_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        protocolVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse buildParsed()
+          throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse result = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.clientVersion_ = clientVersion_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.response_ = response_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.protocolVersion_ = protocolVersion_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.getDefaultInstance()) return this;
+        if (other.hasClientVersion()) {
+          setClientVersion(other.getClientVersion());
+        }
+        if (other.hasResponse()) {
+          mergeResponse(other.getResponse());
+        }
+        if (other.hasProtocolVersion()) {
+          setProtocolVersion(other.getProtocolVersion());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+                              org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              clientVersion_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              org.apache.pulsar.common.api.proto.PulsarApi.AuthData.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.newBuilder();
+              if (hasResponse()) {
+                subBuilder.mergeFrom(getResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              protocolVersion_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional string client_version = 1;
+      private java.lang.Object clientVersion_ = "";
+      public boolean hasClientVersion() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getClientVersion() {
+        java.lang.Object ref = clientVersion_;
+        if (!(ref instanceof String)) {
+          String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+          clientVersion_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setClientVersion(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        clientVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearClientVersion() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        clientVersion_ = getDefaultInstance().getClientVersion();
+        
+        return this;
+      }
+      void setClientVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00000001;
+        clientVersion_ = value;
+        
+      }
+      
+      // optional .pulsar.proto.AuthData response = 2;
+      private org.apache.pulsar.common.api.proto.PulsarApi.AuthData response_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+      public boolean hasResponse() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.AuthData getResponse() {
+        return response_;
+      }
+      public Builder setResponse(org.apache.pulsar.common.api.proto.PulsarApi.AuthData value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        response_ = value;
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder setResponse(
+          org.apache.pulsar.common.api.proto.PulsarApi.AuthData.Builder builderForValue) {
+        response_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder mergeResponse(org.apache.pulsar.common.api.proto.PulsarApi.AuthData value) {
+        if (((bitField0_ & 0x00000002) == 0x00000002) &&
+            response_ != org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance()) {
+          response_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.AuthData.newBuilder(response_).mergeFrom(value).buildPartial();
+        } else {
+          response_ = value;
+        }
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearResponse() {
+        response_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      // optional int32 protocol_version = 3 [default = 0];
+      private int protocolVersion_ ;
+      public boolean hasProtocolVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getProtocolVersion() {
+        return protocolVersion_;
+      }
+      public Builder setProtocolVersion(int value) {
+        bitField0_ |= 0x00000004;
+        protocolVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearProtocolVersion() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        protocolVersion_ = 0;
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandAuthResponse)
+    }
+    
+    static {
+      defaultInstance = new CommandAuthResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandAuthResponse)
+  }
+  
+  public interface CommandAuthChallengeOrBuilder
+      extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // optional string server_version = 1;
+    boolean hasServerVersion();
+    String getServerVersion();
+    
+    // optional .pulsar.proto.AuthData challenge = 2;
+    boolean hasChallenge();
+    org.apache.pulsar.common.api.proto.PulsarApi.AuthData getChallenge();
+    
+    // optional int32 protocol_version = 3 [default = 0];
+    boolean hasProtocolVersion();
+    int getProtocolVersion();
+  }
+  public static final class CommandAuthChallenge extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements CommandAuthChallengeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use CommandAuthChallenge.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private CommandAuthChallenge(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandAuthChallenge> RECYCLER = new io.netty.util.Recycler<CommandAuthChallenge>() {
+            protected CommandAuthChallenge newObject(Handle handle) {
+              return new CommandAuthChallenge(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private CommandAuthChallenge(boolean noInit) {}
+    
+    private static final CommandAuthChallenge defaultInstance;
+    public static CommandAuthChallenge getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandAuthChallenge getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // optional string server_version = 1;
+    public static final int SERVER_VERSION_FIELD_NUMBER = 1;
+    private java.lang.Object serverVersion_;
+    public boolean hasServerVersion() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getServerVersion() {
+      java.lang.Object ref = serverVersion_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          serverVersion_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getServerVersionBytes() {
+      java.lang.Object ref = serverVersion_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+        serverVersion_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+      }
+    }
+    
+    // optional .pulsar.proto.AuthData challenge = 2;
+    public static final int CHALLENGE_FIELD_NUMBER = 2;
+    private org.apache.pulsar.common.api.proto.PulsarApi.AuthData challenge_;
+    public boolean hasChallenge() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.AuthData getChallenge() {
+      return challenge_;
+    }
+    
+    // optional int32 protocol_version = 3 [default = 0];
+    public static final int PROTOCOL_VERSION_FIELD_NUMBER = 3;
+    private int protocolVersion_;
+    public boolean hasProtocolVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public int getProtocolVersion() {
+      return protocolVersion_;
+    }
+    
+    private void initFields() {
+      serverVersion_ = "";
+      challenge_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+      protocolVersion_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getServerVersionBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(2, challenge_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, protocolVersion_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(1, getServerVersionBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(2, challenge_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(3, protocolVersion_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(byte[] data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(
+        byte[] data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseDelimitedFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallengeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        serverVersion_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        challenge_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        protocolVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge buildParsed()
+          throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge result = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.serverVersion_ = serverVersion_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.challenge_ = challenge_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.protocolVersion_ = protocolVersion_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.getDefaultInstance()) return this;
+        if (other.hasServerVersion()) {
+          setServerVersion(other.getServerVersion());
+        }
+        if (other.hasChallenge()) {
+          mergeChallenge(other.getChallenge());
+        }
+        if (other.hasProtocolVersion()) {
+          setProtocolVersion(other.getProtocolVersion());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+                              org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              serverVersion_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              org.apache.pulsar.common.api.proto.PulsarApi.AuthData.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.newBuilder();
+              if (hasChallenge()) {
+                subBuilder.mergeFrom(getChallenge());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setChallenge(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              protocolVersion_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional string server_version = 1;
+      private java.lang.Object serverVersion_ = "";
+      public boolean hasServerVersion() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getServerVersion() {
+        java.lang.Object ref = serverVersion_;
+        if (!(ref instanceof String)) {
+          String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+          serverVersion_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setServerVersion(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        serverVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearServerVersion() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        serverVersion_ = getDefaultInstance().getServerVersion();
+        
+        return this;
+      }
+      void setServerVersion(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00000001;
+        serverVersion_ = value;
+        
+      }
+      
+      // optional .pulsar.proto.AuthData challenge = 2;
+      private org.apache.pulsar.common.api.proto.PulsarApi.AuthData challenge_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+      public boolean hasChallenge() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.AuthData getChallenge() {
+        return challenge_;
+      }
+      public Builder setChallenge(org.apache.pulsar.common.api.proto.PulsarApi.AuthData value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        challenge_ = value;
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder setChallenge(
+          org.apache.pulsar.common.api.proto.PulsarApi.AuthData.Builder builderForValue) {
+        challenge_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder mergeChallenge(org.apache.pulsar.common.api.proto.PulsarApi.AuthData value) {
+        if (((bitField0_ & 0x00000002) == 0x00000002) &&
+            challenge_ != org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance()) {
+          challenge_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.AuthData.newBuilder(challenge_).mergeFrom(value).buildPartial();
+        } else {
+          challenge_ = value;
+        }
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearChallenge() {
+        challenge_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      // optional int32 protocol_version = 3 [default = 0];
+      private int protocolVersion_ ;
+      public boolean hasProtocolVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getProtocolVersion() {
+        return protocolVersion_;
+      }
+      public Builder setProtocolVersion(int value) {
+        bitField0_ |= 0x00000004;
+        protocolVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearProtocolVersion() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        protocolVersion_ = 0;
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandAuthChallenge)
+    }
+    
+    static {
+      defaultInstance = new CommandAuthChallenge(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandAuthChallenge)
+  }
+  
+  public interface AuthDataOrBuilder
+      extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // optional string auth_method_name = 1;
+    boolean hasAuthMethodName();
+    String getAuthMethodName();
+    
+    // optional bytes auth_data = 2;
+    boolean hasAuthData();
+    org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getAuthData();
+  }
+  public static final class AuthData extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements AuthDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use AuthData.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private AuthData(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<AuthData> RECYCLER = new io.netty.util.Recycler<AuthData>() {
+            protected AuthData newObject(Handle handle) {
+              return new AuthData(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private AuthData(boolean noInit) {}
+    
+    private static final AuthData defaultInstance;
+    public static AuthData getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public AuthData getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // optional string auth_method_name = 1;
+    public static final int AUTH_METHOD_NAME_FIELD_NUMBER = 1;
+    private java.lang.Object authMethodName_;
+    public boolean hasAuthMethodName() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getAuthMethodName() {
+      java.lang.Object ref = authMethodName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          authMethodName_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getAuthMethodNameBytes() {
+      java.lang.Object ref = authMethodName_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+        authMethodName_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+      }
+    }
+    
+    // optional bytes auth_data = 2;
+    public static final int AUTH_DATA_FIELD_NUMBER = 2;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString authData_;
+    public boolean hasAuthData() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getAuthData() {
+      return authData_;
+    }
+    
+    private void initFields() {
+      authMethodName_ = "";
+      authData_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getAuthMethodNameBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, authData_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(1, getAuthMethodNameBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(2, authData_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(byte[] data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(
+        byte[] data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseDelimitedFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.AuthData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.AuthData prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.AuthData, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.AuthDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.AuthData.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        authMethodName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        authData_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.AuthData getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.AuthData build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.AuthData result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.AuthData buildParsed()
+          throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.AuthData result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.AuthData buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.AuthData result = org.apache.pulsar.common.api.proto.PulsarApi.AuthData.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.authMethodName_ = authMethodName_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.authData_ = authData_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.AuthData other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.AuthData.getDefaultInstance()) return this;
+        if (other.hasAuthMethodName()) {
+          setAuthMethodName(other.getAuthMethodName());
+        }
+        if (other.hasAuthData()) {
+          setAuthData(other.getAuthData());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+                              org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              authMethodName_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              authData_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional string auth_method_name = 1;
+      private java.lang.Object authMethodName_ = "";
+      public boolean hasAuthMethodName() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getAuthMethodName() {
+        java.lang.Object ref = authMethodName_;
+        if (!(ref instanceof String)) {
+          String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+          authMethodName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setAuthMethodName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        authMethodName_ = value;
+        
+        return this;
+      }
+      public Builder clearAuthMethodName() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        authMethodName_ = getDefaultInstance().getAuthMethodName();
+        
+        return this;
+      }
+      void setAuthMethodName(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00000001;
+        authMethodName_ = value;
+        
+      }
+      
+      // optional bytes auth_data = 2;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString authData_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      public boolean hasAuthData() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getAuthData() {
+        return authData_;
+      }
+      public Builder setAuthData(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        authData_ = value;
+        
+        return this;
+      }
+      public Builder clearAuthData() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        authData_ = getDefaultInstance().getAuthData();
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.AuthData)
+    }
+    
+    static {
+      defaultInstance = new AuthData(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.AuthData)
+  }
+  
   public interface CommandSubscribeOrBuilder
       extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
     
@@ -25098,6 +26514,14 @@ public final class PulsarApi {
     // optional .pulsar.proto.CommandGetSchemaResponse getSchemaResponse = 35;
     boolean hasGetSchemaResponse();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse getGetSchemaResponse();
+    
+    // optional .pulsar.proto.CommandAuthChallenge authChallenge = 36;
+    boolean hasAuthChallenge();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge getAuthChallenge();
+    
+    // optional .pulsar.proto.CommandAuthResponse authResponse = 37;
+    boolean hasAuthResponse();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse getAuthResponse();
   }
   public static final class BaseCommand extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -25170,6 +26594,8 @@ public final class PulsarApi {
       GET_TOPICS_OF_NAMESPACE_RESPONSE(31, 33),
       GET_SCHEMA(32, 34),
       GET_SCHEMA_RESPONSE(33, 35),
+      AUTH_CHALLENGE(34, 36),
+      AUTH_RESPONSE(35, 37),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -25206,6 +26632,8 @@ public final class PulsarApi {
       public static final int GET_TOPICS_OF_NAMESPACE_RESPONSE_VALUE = 33;
       public static final int GET_SCHEMA_VALUE = 34;
       public static final int GET_SCHEMA_RESPONSE_VALUE = 35;
+      public static final int AUTH_CHALLENGE_VALUE = 36;
+      public static final int AUTH_RESPONSE_VALUE = 37;
       
       
       public final int getNumber() { return value; }
@@ -25246,6 +26674,8 @@ public final class PulsarApi {
           case 33: return GET_TOPICS_OF_NAMESPACE_RESPONSE;
           case 34: return GET_SCHEMA;
           case 35: return GET_SCHEMA_RESPONSE;
+          case 36: return AUTH_CHALLENGE;
+          case 37: return AUTH_RESPONSE;
           default: return null;
         }
       }
@@ -25623,6 +27053,26 @@ public final class PulsarApi {
       return getSchemaResponse_;
     }
     
+    // optional .pulsar.proto.CommandAuthChallenge authChallenge = 36;
+    public static final int AUTHCHALLENGE_FIELD_NUMBER = 36;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge authChallenge_;
+    public boolean hasAuthChallenge() {
+      return ((bitField1_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge getAuthChallenge() {
+      return authChallenge_;
+    }
+    
+    // optional .pulsar.proto.CommandAuthResponse authResponse = 37;
+    public static final int AUTHRESPONSE_FIELD_NUMBER = 37;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse authResponse_;
+    public boolean hasAuthResponse() {
+      return ((bitField1_ & 0x00000010) == 0x00000010);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse getAuthResponse() {
+      return authResponse_;
+    }
+    
     private void initFields() {
       type_ = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -25659,6 +27109,8 @@ public final class PulsarApi {
       getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
       getSchema_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema.getDefaultInstance();
       getSchemaResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
+      authChallenge_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.getDefaultInstance();
+      authResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -25978,6 +27430,12 @@ public final class PulsarApi {
       if (((bitField1_ & 0x00000004) == 0x00000004)) {
         output.writeMessage(35, getSchemaResponse_);
       }
+      if (((bitField1_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(36, authChallenge_);
+      }
+      if (((bitField1_ & 0x00000010) == 0x00000010)) {
+        output.writeMessage(37, authResponse_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -26126,6 +27584,14 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeMessageSize(35, getSchemaResponse_);
       }
+      if (((bitField1_ & 0x00000008) == 0x00000008)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(36, authChallenge_);
+      }
+      if (((bitField1_ & 0x00000010) == 0x00000010)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(37, authResponse_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -26309,6 +27775,10 @@ public final class PulsarApi {
         bitField1_ = (bitField1_ & ~0x00000002);
         getSchemaResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse.getDefaultInstance();
         bitField1_ = (bitField1_ & ~0x00000004);
+        authChallenge_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.getDefaultInstance();
+        bitField1_ = (bitField1_ & ~0x00000008);
+        authResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.getDefaultInstance();
+        bitField1_ = (bitField1_ & ~0x00000010);
         return this;
       }
       
@@ -26484,6 +27954,14 @@ public final class PulsarApi {
           to_bitField1_ |= 0x00000004;
         }
         result.getSchemaResponse_ = getSchemaResponse_;
+        if (((from_bitField1_ & 0x00000008) == 0x00000008)) {
+          to_bitField1_ |= 0x00000008;
+        }
+        result.authChallenge_ = authChallenge_;
+        if (((from_bitField1_ & 0x00000010) == 0x00000010)) {
+          to_bitField1_ |= 0x00000010;
+        }
+        result.authResponse_ = authResponse_;
         result.bitField0_ = to_bitField0_;
         result.bitField1_ = to_bitField1_;
         return result;
@@ -26596,6 +28074,12 @@ public final class PulsarApi {
         if (other.hasGetSchemaResponse()) {
           mergeGetSchemaResponse(other.getGetSchemaResponse());
         }
+        if (other.hasAuthChallenge()) {
+          mergeAuthChallenge(other.getAuthChallenge());
+        }
+        if (other.hasAuthResponse()) {
+          mergeAuthResponse(other.getAuthResponse());
+        }
         return this;
       }
       
@@ -27170,6 +28654,26 @@ public final class PulsarApi {
               subBuilder.recycle();
               break;
             }
+            case 290: {
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.newBuilder();
+              if (hasAuthChallenge()) {
+                subBuilder.mergeFrom(getAuthChallenge());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setAuthChallenge(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 298: {
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.newBuilder();
+              if (hasAuthResponse()) {
+                subBuilder.mergeFrom(getAuthResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setAuthResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -28663,6 +30167,92 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional .pulsar.proto.CommandAuthChallenge authChallenge = 36;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge authChallenge_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.getDefaultInstance();
+      public boolean hasAuthChallenge() {
+        return ((bitField1_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge getAuthChallenge() {
+        return authChallenge_;
+      }
+      public Builder setAuthChallenge(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        authChallenge_ = value;
+        
+        bitField1_ |= 0x00000008;
+        return this;
+      }
+      public Builder setAuthChallenge(
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.Builder builderForValue) {
+        authChallenge_ = builderForValue.build();
+        
+        bitField1_ |= 0x00000008;
+        return this;
+      }
+      public Builder mergeAuthChallenge(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge value) {
+        if (((bitField1_ & 0x00000008) == 0x00000008) &&
+            authChallenge_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.getDefaultInstance()) {
+          authChallenge_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.newBuilder(authChallenge_).mergeFrom(value).buildPartial();
+        } else {
+          authChallenge_ = value;
+        }
+        
+        bitField1_ |= 0x00000008;
+        return this;
+      }
+      public Builder clearAuthChallenge() {
+        authChallenge_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge.getDefaultInstance();
+        
+        bitField1_ = (bitField1_ & ~0x00000008);
+        return this;
+      }
+      
+      // optional .pulsar.proto.CommandAuthResponse authResponse = 37;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse authResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.getDefaultInstance();
+      public boolean hasAuthResponse() {
+        return ((bitField1_ & 0x00000010) == 0x00000010);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse getAuthResponse() {
+        return authResponse_;
+      }
+      public Builder setAuthResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        authResponse_ = value;
+        
+        bitField1_ |= 0x00000010;
+        return this;
+      }
+      public Builder setAuthResponse(
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.Builder builderForValue) {
+        authResponse_ = builderForValue.build();
+        
+        bitField1_ |= 0x00000010;
+        return this;
+      }
+      public Builder mergeAuthResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse value) {
+        if (((bitField1_ & 0x00000010) == 0x00000010) &&
+            authResponse_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.getDefaultInstance()) {
+          authResponse_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.newBuilder(authResponse_).mergeFrom(value).buildPartial();
+        } else {
+          authResponse_ = value;
+        }
+        
+        bitField1_ |= 0x00000010;
+        return this;
+      }
+      public Builder clearAuthResponse() {
+        authResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse.getDefaultInstance();
+        
+        bitField1_ = (bitField1_ & ~0x00000010);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 3f10d49..b3e6f0f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -168,6 +168,7 @@ enum ProtocolVersion {
 			 // Added CommandActiveConsumerChange
 			 // Added CommandGetTopicsOfNamespace
 	v13 = 13; // Schema-registry : added avro schema format for json
+	v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth
 }
 
 message CommandConnect {
@@ -199,6 +200,24 @@ message CommandConnected {
 	optional int32 protocol_version = 2 [default = 0];
 }
 
+message CommandAuthResponse {
+	optional string client_version = 1;
+	optional AuthData response = 2;
+	optional int32 protocol_version = 3 [default = 0];
+}
+
+message CommandAuthChallenge {
+	optional string server_version = 1;
+	optional AuthData challenge = 2;
+	optional int32 protocol_version = 3 [default = 0];
+}
+
+// To support mutual authentication type, such as Sasl, reuse this command to mutual auth.
+message AuthData {
+	optional string auth_method_name = 1;
+	optional bytes auth_data = 2;
+}
+
 message CommandSubscribe {
 	enum SubType {
 		Exclusive = 0;
@@ -598,6 +617,9 @@ message BaseCommand {
 
 		GET_SCHEMA = 34;
 		GET_SCHEMA_RESPONSE = 35;
+
+		AUTH_CHALLENGE = 36;
+		AUTH_RESPONSE = 37;
 	}
 
 
@@ -650,4 +672,8 @@ message BaseCommand {
 
 	optional CommandGetSchema getSchema = 34;
 	optional CommandGetSchemaResponse getSchemaResponse = 35;
+
+	optional CommandAuthChallenge authChallenge = 36;
+	optional CommandAuthResponse authResponse = 37;
+
 }