You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/04/25 13:28:47 UTC

[pulsar] branch master updated: Issue #3653: Kerberos authentication for web resource support (#4097)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2777b0e  Issue #3653: Kerberos authentication for web resource support (#4097)
2777b0e is described below

commit 2777b0e4bd6d1a70b1343b2834c2630e6a7e1e5d
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Thu Apr 25 21:28:40 2019 +0800

    Issue #3653: Kerberos authentication for web resource support (#4097)
    
    Fixes #3653
    
    Master Issue: #3491
    
    ** Motivation
    Add kerberos support for web resource support.
    This mainly include 2 parts:
    
    - the HttpClient that works for HttpLookup.
    - the BaseResource that works for admin rest end point.
    
    *** Modifications
    For kerberos authentication, there need several back/forth requests to do the negotiation between client and server.
    This change add a method authenticationStage in AuthenticationSasl, and a method authenticateHttpRequest in AuthenticationProviderSasl to do the mutual negotiation.
    And a saslRoleToken is cached in AuthenticationSasl once the authentication get success.
    When do the sasl authentication, it will first use saslRoleToken cache, and if sever check this token failed, do real sasl authentication.
    Changed unit test SaslAuthenticateTest, which enable sasl authentication in admin and also use http lookup to verify the change.
---
 conf/broker.conf                                   |   5 -
 .../authentication/AuthenticationProviderSasl.java | 185 ++++++++++++++++-
 .../broker/authentication/PulsarSaslServer.java    |  12 --
 .../SaslAuthenticationDataSource.java              |   6 -
 .../authentication/SaslAuthenticationState.java    |  15 +-
 .../broker/authentication/SaslRoleToken.java       | 228 +++++++++++++++++++++
 .../broker/authentication/SaslRoleTokenSigner.java | 107 ++++++++++
 .../ProxySaslAuthenticationTest.java               |  27 ++-
 .../authentication/SaslAuthenticateTest.java       |  91 +++++++-
 .../authentication/SaslServerTokenSignerTest.java  | 111 ++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  10 -
 .../authentication/AuthenticationProvider.java     |  10 +
 .../broker/authentication/AuthenticationState.java |  11 +-
 .../pulsar/broker/web/AuthenticationFilter.java    |  48 ++++-
 .../apache/pulsar/broker/service/ServerCnx.java    |   7 -
 .../pulsar/broker/web/PulsarWebResource.java       |  24 +--
 .../org/apache/pulsar/broker/web/WebService.java   |   5 +-
 .../pulsar/client/admin/internal/BaseResource.java |  56 ++++-
 .../client/admin/internal/ComponentResource.java   |  31 ++-
 .../client/admin/internal/FunctionsImpl.java       |   8 +-
 .../pulsar/client/admin/internal/SinkImpl.java     |   4 +-
 .../pulsar/client/admin/internal/SourceImpl.java   |   4 +-
 .../apache/pulsar/client/api/Authentication.java   |  26 ++-
 .../client/api/AuthenticationDataProvider.java     |   2 +-
 pulsar-client-auth-sasl/pom.xml                    |   5 +
 .../client/impl/auth/AuthenticationSasl.java       | 214 ++++++++++++++++++-
 .../impl/auth/SaslAuthenticationDataProvider.java  |  19 ++
 .../org/apache/pulsar/client/impl/HttpClient.java  | 116 ++++++-----
 .../apache/pulsar/common/sasl/SaslConstants.java   |  26 +++
 .../apache/pulsar/websocket/WebSocketService.java  |   5 +-
 .../websocket/admin/WebSocketWebResource.java      |   2 +-
 31 files changed, 1242 insertions(+), 178 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1bb171c..8a7da2b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -351,11 +351,6 @@ tokenAuthClaim=
 
 ### --- SASL Authentication Provider --- ###
 
-# Whether Use SASL Authentication or not.
-# TODO: used to bypass web resource check. will remove it after implementation the support.
-# github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
-isSaslAuthentication=
-
 # This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.
 # Default value: `SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT`, which is ".*pulsar.*",
 # so only clients whose id contains 'pulsar' are allowed to connect.
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index a43cdb0..4ab7d01 100644
--- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -18,21 +18,39 @@
  */
 package org.apache.pulsar.broker.authentication;
 
-import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
+import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
 import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
+import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
 import static org.apache.pulsar.common.sasl.SaslConstants.KINIT_COMMAND;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN_EXPIRED;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_TOKEN;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_HEADER_STATE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_CLIENT_INIT;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_COMPLETE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_NEGOTIATE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER_CHECK_TOKEN;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.Base64;
 import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import javax.security.auth.login.LoginException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
 import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.AuthData;
@@ -76,6 +94,8 @@ public class AuthenticationProviderSasl implements AuthenticationProvider {
                 throw new IOException(e);
             }
         }
+
+        this.signer = new SaslRoleTokenSigner(Long.toString(new Random().nextLong()).getBytes());
     }
 
     @Override
@@ -109,4 +129,167 @@ public class AuthenticationProviderSasl implements AuthenticationProvider {
             throw new AuthenticationException(t.getMessage());
         }
     }
+
+    // for http auth.
+    private static final long SASL_ROLE_TOKEN_LIVE_SECONDS = 3600;
+    // A signer for http role token, with random secret.
+    private SaslRoleTokenSigner signer;
+
+    /**
+     * Returns null if authentication has not completed.
+     * Return auth role if authentication has completed, and httpRequest's role token contains the authRole
+     */
+    public String authRoleFromHttpRequest(HttpServletRequest httpRequest) throws AuthenticationException {
+        String tokenStr = httpRequest.getHeader(SASL_AUTH_ROLE_TOKEN);
+
+        if (tokenStr == null) {
+            return null;
+        }
+
+        String unSigned = signer.verifyAndExtract(tokenStr);
+        SaslRoleToken token;
+
+        try {
+            token = SaslRoleToken.parse(unSigned);
+            if (log.isDebugEnabled()) {
+                log.debug("server side get role token: {}, session in token:{}, session in request:{}",
+                    token, token.getSession(), httpRequest.getRemoteAddr());
+            }
+        } catch (Exception e) {
+            log.error("token parse failed, with exception: ",  e);
+            return SASL_AUTH_ROLE_TOKEN_EXPIRED;
+        }
+
+        if (!token.isExpired()) {
+            return token.getUserRole();
+        } else if (token.isExpired()) {
+            return SASL_AUTH_ROLE_TOKEN_EXPIRED;
+        } else {
+            return null;
+        }
+    }
+
+    private String createAuthRoleToken(String role, String sessionId) {
+        long expireAtMs = System.currentTimeMillis() + SASL_ROLE_TOKEN_LIVE_SECONDS * 1000; // 1 hour
+        SaslRoleToken token = new SaslRoleToken(role, sessionId, expireAtMs);
+
+        String signed = signer.sign(token.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("create role token token: {}, role: {} session :{}, expires:{}\nsigned:{}",
+                token, token.getUserRole(), token.getSession(), token.getExpires(), signed);
+        }
+        return signed;
+    }
+
+    private ConcurrentHashMap<Long, AuthenticationState> authStates = new ConcurrentHashMap<>();
+
+    // return authState if it is in cache.
+    private AuthenticationState getAuthState(HttpServletRequest request) {
+        String id = request.getHeader(SASL_STATE_SERVER);
+        if (id == null) {
+            return null;
+        }
+
+        try {
+            return authStates.get(Long.parseLong(id));
+        } catch (NumberFormatException e) {
+            log.error("[{}] Wrong Id String in Token {}. e:", request.getRequestURI(),
+                id, e);
+            return null;
+        }
+    }
+
+    private void setResponseHeaderState(HttpServletResponse response, String state) {
+        response.setHeader(SaslConstants.SASL_HEADER_TYPE, SaslConstants.SASL_TYPE_VALUE);
+        response.setHeader(SASL_HEADER_STATE, state);
+    }
+
+    /**
+     * Passed in request, set response, according to request.
+     * and return whether we should do following chain.doFilter or not.
+     */
+    @Override
+    public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
+        AuthenticationState state = getAuthState(request);
+        String saslAuthRoleToken = authRoleFromHttpRequest(request);
+
+        // role token exist
+        if (saslAuthRoleToken != null) {
+            // role token expired, send role token expired to client.
+            if (saslAuthRoleToken.equalsIgnoreCase(SASL_AUTH_ROLE_TOKEN_EXPIRED)) {
+                setResponseHeaderState(response, SASL_AUTH_ROLE_TOKEN_EXPIRED);
+                response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Role token expired");
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Server side role token expired: {}", request.getRequestURI(), saslAuthRoleToken);
+                }
+                return false;
+            }
+
+            // role token OK to use,
+            // if request is ask for role token verify, send auth complete to client
+            // if request is a real request with valid role token, pass this request down.
+            if (request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_COMPLETE)) {
+                request.setAttribute(AuthenticatedRoleAttributeName, saslAuthRoleToken);
+                request.setAttribute(AuthenticatedDataAttributeName,
+                    new AuthenticationDataHttps(request));
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Server side role token OK to go on: {}", request.getRequestURI(), saslAuthRoleToken);
+                }
+                return true;
+            } else {
+                checkState(request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_SERVER_CHECK_TOKEN));
+                setResponseHeaderState(response, SASL_STATE_COMPLETE);
+                response.setHeader(SASL_STATE_SERVER, request.getHeader(SASL_STATE_SERVER));
+                response.setStatus(HttpServletResponse.SC_OK);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Server side role token verified success: {}", request.getRequestURI(), saslAuthRoleToken);
+                }
+                return false;
+            }
+        } else {
+            // no role token, do sasl auth
+            // need new authState
+            if (state == null || request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_CLIENT_INIT)) {
+                state = newAuthState(null, null, null);
+                authStates.put(state.getStateId(), state);
+            }
+            checkState(request.getHeader(SASL_AUTH_TOKEN) != null,
+                "Header token should exist if no role token.");
+
+            // do the sasl auth
+            AuthData clientData = AuthData.of(Base64.getDecoder().decode(
+                request.getHeader(SASL_AUTH_TOKEN)));
+            AuthData brokerData = state.authenticate(clientData);
+
+            // authentication has completed, it has get the auth role.
+            if (state.isComplete()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] SASL server authentication complete, send OK to client.", request.getRequestURI());
+                }
+                String authRole = state.getAuthRole();
+                String authToken = createAuthRoleToken(authRole, String.valueOf(state.getStateId()));
+                response.setHeader(SASL_AUTH_ROLE_TOKEN, authToken);
+
+                // auth request complete, return OK, wait for a new real request to come.
+                response.setHeader(SASL_STATE_SERVER, String.valueOf(state.getStateId()));
+                setResponseHeaderState(response, SASL_STATE_COMPLETE);
+                response.setStatus(HttpServletResponse.SC_OK);
+
+                // auth completed, no need to keep authState
+                authStates.remove(state.getStateId());
+                return false;
+            } else {
+                // auth not complete
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] SASL server authentication not complete, send {} back to client.",
+                        request.getRequestURI(), HttpServletResponse.SC_UNAUTHORIZED);
+                }
+                setResponseHeaderState(response, SASL_STATE_NEGOTIATE);
+                response.setHeader(SASL_STATE_SERVER, String.valueOf(state.getStateId()));
+                response.setHeader(SASL_AUTH_TOKEN, Base64.getEncoder().encodeToString(brokerData.getBytes()));
+                response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "SASL Authentication not complete.");
+                return false;
+            }
+        }
+    }
 }
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
index f7e7eb9..8e78e44 100644
--- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
@@ -186,18 +186,6 @@ public class PulsarSaslServer {
             ac.setAuthorized(true);
             log.info("Successfully authenticated client: authenticationID: {};  authorizationID: {}.",
                 authenticationID, authorizationID);
-
-            KerberosName kerberosName = new KerberosName(authenticationID);
-            try {
-                StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
-                userNameBuilder.append("/").append(kerberosName.getHostName());
-                userNameBuilder.append("@").append(kerberosName.getRealm());
-
-                log.info("Setting authorizedID: {} ", userNameBuilder);
-                ac.setAuthorizedID(userNameBuilder.toString());
-            } catch (IOException e) {
-                log.error("Failed to set name based on Kerberos authentication rules.");
-            }
         }
     }
 }
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationDataSource.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationDataSource.java
index 2fe6546..59b408a 100644
--- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationDataSource.java
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationDataSource.java
@@ -53,10 +53,4 @@ public class SaslAuthenticationDataSource implements AuthenticationDataSource {
     public String getAuthorizationID() {
         return pulsarSaslServer.getAuthorizationID();
     }
-
-    // TODO: for http support. github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
-    /* default boolean hasDataFromHttp() {
-        return false;
-    }*/
-
 }
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationState.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationState.java
index d345d81..f2dbb6f 100644
--- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationState.java
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationState.java
@@ -21,8 +21,11 @@ package org.apache.pulsar.broker.authentication;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import javax.naming.AuthenticationException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.api.AuthData;
 
 /**
@@ -31,10 +34,14 @@ import org.apache.pulsar.common.api.AuthData;
  * It is basically holding the the authentication state.
  * It tell broker whether the authentication is completed or not,
  */
+@Slf4j
 public class SaslAuthenticationState implements AuthenticationState {
-    private SaslAuthenticationDataSource authenticationDataSource;
+    private final long stateId;
+    private static final AtomicLong stateIdGenerator = new AtomicLong(0L);
+    private final SaslAuthenticationDataSource authenticationDataSource;
 
     public SaslAuthenticationState(AuthenticationDataSource authenticationDataSource) {
+        stateId = stateIdGenerator.incrementAndGet();
         checkArgument(authenticationDataSource instanceof SaslAuthenticationDataSource);
         this.authenticationDataSource = (SaslAuthenticationDataSource)authenticationDataSource;
     }
@@ -58,9 +65,15 @@ public class SaslAuthenticationState implements AuthenticationState {
      * Returns null if authentication has completed, and no auth data is required to send back to client.
      * Do auth and Returns the auth data back to client, if authentication has not completed.
      */
+    @Override
     public AuthData authenticate(AuthData authData) throws AuthenticationException {
         return authenticationDataSource.authenticate(authData);
     }
 
+    @Override
+    public long getStateId() {
+        return stateId;
+    }
+
 
 }
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslRoleToken.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslRoleToken.java
new file mode 100644
index 0000000..f95bc6a
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslRoleToken.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import javax.naming.AuthenticationException;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SaslRoleToken implements Principal {
+
+    /**
+     * Constant that identifies an anonymous request.
+     */
+    public static final SaslRoleToken ANONYMOUS = new SaslRoleToken();
+
+    private static final String ATTR_SEPARATOR = "&";
+    private static final String USER_ROLE = "u";
+    private static final String EXPIRES = "e";
+    private static final String SESSION = "i";
+
+    private final static Set<String> ATTRIBUTES =
+        new HashSet<String>(Arrays.asList(USER_ROLE,  EXPIRES,  SESSION));
+
+    private String userRole;
+    private String session;
+    private long expires;
+    private String token;
+
+    private SaslRoleToken() {
+        userRole = null;
+        session = null;
+        expires = -1;
+        token = "ANONYMOUS";
+        generateToken();
+    }
+
+    private static final String ILLEGAL_ARG_MSG = " is NULL, empty or contains a '" + ATTR_SEPARATOR + "'";
+
+    /**
+     * Creates an authentication token.
+     *
+     * @param userRole user name.
+     * @param session the sessionId.
+     * (<code>System.currentTimeMillis() + validityPeriod</code>).
+     */
+    public SaslRoleToken(String userRole, String session) {
+        checkForIllegalArgument(session, "session");
+        this.userRole = userRole;
+        this.session = session;
+        this.expires = -1;
+        generateToken();
+    }
+
+    public SaslRoleToken(String userRole, String session, long expires) {
+        checkForIllegalArgument(userRole, "userRole");
+        checkForIllegalArgument(session, "session");
+        this.userRole = userRole;
+        this.session = session;
+        this.expires = expires;
+        generateToken();
+    }
+
+    /**
+     * Check if the provided value is invalid. Throw an error if it is invalid, NOP otherwise.
+     *
+     * @param value the value to check.
+     * @param name the parameter name to use in an error message if the value is invalid.
+     */
+    private static void checkForIllegalArgument(String value, String name) {
+        if (value == null || value.length() == 0 || value.contains(ATTR_SEPARATOR)) {
+            throw new IllegalArgumentException(name + ILLEGAL_ARG_MSG);
+        }
+    }
+
+    /**
+     * Sets the expiration of the token.
+     *
+     * @param expires expiration time of the token in milliseconds since the epoch.
+     */
+    public void setExpires(long expires) {
+        if (this != SaslRoleToken.ANONYMOUS) {
+            this.expires = expires;
+            generateToken();
+        }
+    }
+
+    /**
+     * Generates the token.
+     */
+    private void generateToken() {
+        StringBuffer sb = new StringBuffer();
+        sb.append(USER_ROLE).append("=").append(getUserRole()).append(ATTR_SEPARATOR);
+        sb.append(SESSION).append("=").append(getSession()).append(ATTR_SEPARATOR);
+        sb.append(EXPIRES).append("=").append(getExpires());
+        token = sb.toString();
+    }
+
+    /**
+     * Returns the user name.
+     *
+     * @return the user name.
+     */
+    public String getUserRole() {
+        return userRole;
+    }
+
+    /**
+     * Returns the principal name (this method name comes from the JDK {@link Principal} interface).
+     *
+     * @return the principal name.
+     */
+    @Override
+    public String getName() {
+        return userRole;
+    }
+
+    /**
+     * Returns the authentication mechanism of the token.
+     *
+     * @return the authentication mechanism of the token.
+     */
+    public String getSession() {
+        return session;
+    }
+
+    /**
+     * Returns the expiration time of the token.
+     *
+     * @return the expiration time of the token, in milliseconds since Epoc.
+     */
+    public long getExpires() {
+        return expires;
+    }
+
+    /**
+     * Returns if the token has expired.
+     *
+     * @return if the token has expired.
+     */
+    public boolean isExpired() {
+        return getExpires() != -1 && System.currentTimeMillis() > getExpires();
+    }
+
+    /**
+     * Returns the string representation of the token.
+     * <p/>
+     * This string representation is parseable by the {@link #parse} method.
+     *
+     * @return the string representation of the token.
+     */
+    @Override
+    public String toString() {
+        return token;
+    }
+
+    /**
+     * Parses a string into an authentication token.
+     *
+     * @param tokenStr string representation of a token.
+     *
+     * @return the parsed authentication token.
+     *
+     * @throws AuthenticationException thrown if the string representation could not be parsed into
+     * an authentication token.
+     */
+    public static SaslRoleToken parse(String tokenStr) throws AuthenticationException {
+        Map<String, String> map = split(tokenStr);
+        if (!map.keySet().equals(ATTRIBUTES)) {
+            throw new AuthenticationException("Invalid token string, missing attributes");
+        }
+        long expires = Long.parseLong(map.get(EXPIRES));
+        SaslRoleToken token = new SaslRoleToken(map.get(USER_ROLE), map.get(SESSION));
+        token.setExpires(expires);
+        return token;
+    }
+
+    /**
+     * Splits the string representation of a token into attributes pairs.
+     *
+     * @param tokenStr string representation of a token.
+     *
+     * @return a map with the attribute pairs of the token.
+     *
+     * @throws AuthenticationException thrown if the string representation of the token could not be broken into
+     * attribute pairs.
+     */
+    private static Map<String, String> split(String tokenStr) throws AuthenticationException {
+        Map<String, String> map = new HashMap<String, String>();
+        StringTokenizer st = new StringTokenizer(tokenStr, ATTR_SEPARATOR);
+        while (st.hasMoreTokens()) {
+            String part = st.nextToken();
+            int separator = part.indexOf('=');
+            if (separator == -1) {
+                throw new AuthenticationException("Invalid authentication token");
+            }
+            String key = part.substring(0, separator);
+            String value = part.substring(separator + 1);
+            map.put(key, value);
+        }
+        return map;
+    }
+
+}
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslRoleTokenSigner.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslRoleTokenSigner.java
new file mode 100644
index 0000000..dee320f
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslRoleTokenSigner.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import javax.naming.AuthenticationException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+
+@Slf4j
+public class SaslRoleTokenSigner {
+    private static final String SIGNATURE = "&s=";
+
+    private byte[] secret;
+
+    /**
+     * Creates a SaslRoleTokenSigner instance using the specified secret.
+     *
+     * @param secret secret to use for creating the digest.
+     */
+    public SaslRoleTokenSigner(byte[] secret) {
+        if (secret == null) {
+            throw new IllegalArgumentException("secret cannot be NULL");
+        }
+        this.secret = secret.clone();
+    }
+
+    /**
+     * Returns a signed string.
+     * <p/>
+     * The signature '&s=SIGNATURE' is appended at the end of the string.
+     *
+     * @param str string to sign.
+     *
+     * @return the signed string.
+     */
+    public String sign(String str) {
+        if (str == null || str.length() == 0) {
+            throw new IllegalArgumentException("NULL or empty string to sign");
+        }
+        String signature = computeSignature(str);
+        return str + SIGNATURE + signature;
+    }
+
+    /**
+     * Verifies a signed string and extracts the original string.
+     *
+     * @param signedStr the signed string to verify and extract.
+     *
+     * @return the extracted original string.
+     *
+     * @throws AuthenticationException thrown if the given string is not a signed string or if the signature is invalid.
+     */
+    public String verifyAndExtract(String signedStr) throws AuthenticationException {
+        int index = signedStr.lastIndexOf(SIGNATURE);
+        if (index == -1) {
+            throw new AuthenticationException("Invalid signed text: " + signedStr);
+        }
+        String originalSignature = signedStr.substring(index + SIGNATURE.length());
+        String rawValue = signedStr.substring(0, index);
+        String currentSignature = computeSignature(rawValue);
+        if (!originalSignature.equals(currentSignature)) {
+            throw new AuthenticationException("Invalid signature");
+        }
+        return rawValue;
+    }
+
+    /**
+     * Returns the signature of a string.
+     *
+     * @param str string to sign.
+     *
+     * @return the signature for the string.
+     */
+    protected String computeSignature(String str) {
+        try {
+            MessageDigest md = MessageDigest.getInstance("SHA");
+
+            md.update(str.getBytes());
+
+            md.update(secret);
+            byte[] digest = md.digest();
+            return new Base64(0).encodeToString(digest);
+        } catch (NoSuchAlgorithmException ex) {
+            throw new RuntimeException("It should not happen, " + ex.getMessage(), ex);
+        }
+    }
+}
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index 9ab711f..2d9d51d 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -30,10 +30,12 @@ import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.Configuration;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
@@ -99,7 +101,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 		File keytabProxy = new File(kerberosWorkDir, "pulsarproxy.keytab");
 		kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
 
-		File jaasFile = new File(kerberosWorkDir, "jaas.properties");
+		File jaasFile = new File(kerberosWorkDir, "jaas.conf");
 		try (FileWriter writer = new FileWriter(jaasFile)) {
 			writer.write("\n"
 				+ "PulsarBroker {\n"
@@ -135,7 +137,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 			);
 		}
 
-		File krb5file = new File(kerberosWorkDir, "krb5.properties");
+		File krb5file = new File(kerberosWorkDir, "krb5.conf");
 		try (FileWriter writer = new FileWriter(krb5file)) {
 			String conf = "[libdefaults]\n"
 				+ " default_realm = " + kdc.getRealm() + "\n"
@@ -147,11 +149,11 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 				+ "  kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
 				+ " }";
 			writer.write(conf);
-			log.info("krb5.properties:\n" + conf);
+			log.info("krb5.conf:\n" + conf);
 		}
 
 		System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
-		System.setProperty("java.security.krb5.properties", krb5file.getAbsolutePath());
+		System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
 		Configuration.getConfiguration().refresh();
 
 		// Client config
@@ -162,7 +164,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 	@AfterClass
 	public static void stopMiniKdc() {
 		System.clearProperty("java.security.auth.login.config");
-		System.clearProperty("java.security.krb5.properties");
+		System.clearProperty("java.security.krb5.conf");
 		if (kdc != null) {
 			kdc.stop();
 		}
@@ -181,18 +183,27 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 		isTcpLookup = true;
 		conf.setAdvertisedAddress(localHostname);
 		conf.setAuthenticationEnabled(true);
-		conf.setSaslAuthentication(true);
 		conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
 		conf.setSaslJaasServerSectionName("PulsarBroker");
 		Set<String> providers = new HashSet<>();
 		providers.add(AuthenticationProviderSasl.class.getName());
 		conf.setAuthenticationProviders(providers);
 		conf.setClusterName("test");
+		conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));
 
 		super.init();
 
 		lookupUrl = new URI("broker://" + "localhost" + ":" + BROKER_PORT);
 
+		// set admin auth, to verify admin web resources
+		Map<String, String> clientSaslConfig = Maps.newHashMap();
+		clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+		clientSaslConfig.put("serverType", "broker");
+		log.info("set client jaas section name: PulsarClient");
+		admin = PulsarAdmin.builder()
+			.serviceHttpUrl(brokerUrl.toString())
+			.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
+			.build();
 		super.producerBaseSetup();
 		log.info("-- {} --, end.", methodName);
 	}
@@ -208,7 +219,6 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 		log.info("-- Starting {} test --", methodName);
 
 		// Step 1: Create Admin Client
-		//updateAdminClient();
 		final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
 		// create a client which connects to proxy and pass authData
 		String topicName = "persistent://my-property/my-ns/my-topic1";
@@ -223,9 +233,6 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 
 		// proxy connect to broker
 		proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
-		/*proxyConfig.setBrokerClientAuthenticationParameters(
-			"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
-				"\"serverType\": " + "\"broker\"}");*/
 		proxyConfig.setBrokerClientAuthenticationParameters(
 			"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
 				"\"serverType\": " + "\"broker\"}");
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index e098bcd..1a734f9 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.pulsar.broker.authentication;
 
-import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileWriter;
 import java.net.URI;
@@ -29,11 +28,17 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
 import javax.security.auth.login.Configuration;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -42,6 +47,8 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
+import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.sasl.SaslConstants;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -72,8 +79,9 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
         String principalServerNoRealm = "broker/" + localHostname;
         String principalServer = "broker/" + localHostname + "@" + kdc.getRealm();
         log.info("principalServer: " + principalServer);
-        String principalClientNoRealm = "client/" + localHostname;
+        String principalClientNoRealm = "client";
         String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
+
         log.info("principalClient: " + principalClient);
 
         File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
@@ -82,7 +90,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
         File keytabServer = new File(kerberosWorkDir, "pulsarbroker.keytab");
         kdc.createPrincipal(keytabServer, principalServerNoRealm);
 
-        File jaasFile = new File(kerberosWorkDir, "jaas.properties");
+        File jaasFile = new File(kerberosWorkDir, "jaas.conf");
         try (FileWriter writer = new FileWriter(jaasFile)) {
             writer.write("\n"
                 + "PulsarBroker {\n"
@@ -107,7 +115,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
             );
         }
 
-        File krb5file = new File(kerberosWorkDir, "krb5.properties");
+        File krb5file = new File(kerberosWorkDir, "krb5.conf");
         try (FileWriter writer = new FileWriter(krb5file)) {
             String conf = "[libdefaults]\n"
                 + " default_realm = " + kdc.getRealm() + "\n"
@@ -119,16 +127,17 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
                 + "  kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
                 + " }";
             writer.write(conf);
-            log.info("krb5.properties:\n" + conf);
+            log.info("krb5.conf:\n" + conf);
         }
 
         System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
-        System.setProperty("java.security.krb5.properties", krb5file.getAbsolutePath());
+        System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
         Configuration.getConfiguration().refresh();
 
         // Client config
         Map<String, String> clientSaslConfig = Maps.newHashMap();
         clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "broker");
         log.info("set client jaas section name: PulsarClient");
         authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);
         log.info("created AuthenticationSasl");
@@ -137,7 +146,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
     @AfterClass
     public static void stopMiniKdc() {
         System.clearProperty("java.security.auth.login.config");
-        System.clearProperty("java.security.krb5.properties");
+        System.clearProperty("java.security.krb5.conf");
         if (kdc != null) {
             kdc.stop();
         }
@@ -151,26 +160,39 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
     @Override
     protected void setup() throws Exception {
         log.info("-- {} --, start at host: {}", methodName, localHostname);
-        isTcpLookup = true;
+        // use http lookup to verify HttpClient works well.
+        isTcpLookup = false;
+
         conf.setAdvertisedAddress(localHostname);
         conf.setAuthenticationEnabled(true);
-        conf.setSaslAuthentication(true);
-        conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+        conf.setSaslJaasClientAllowedIds(".*" + "client" + ".*");
         conf.setSaslJaasServerSectionName("PulsarBroker");
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderSasl.class.getName());
         conf.setAuthenticationProviders(providers);
         conf.setClusterName("test");
+        conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm()));
 
         super.init();
 
-        lookupUrl = new URI("broker://" + "localhost" + ":" + BROKER_PORT);
+        lookupUrl = new URI("http://" + "localhost" + ":" + BROKER_WEBSERVICE_PORT);
+
         pulsarClient = PulsarClient.builder()
             .serviceUrl(lookupUrl.toString())
             .statsInterval(0, TimeUnit.SECONDS)
             .authentication(authSasl).build();
 
+        // set admin auth, to verify admin web resources
+        Map<String, String> clientSaslConfig = Maps.newHashMap();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "broker");
+        log.info("set client jaas section name: PulsarClient");
+        admin = PulsarAdmin.builder()
+            .serviceHttpUrl(brokerUrl.toString())
+            .authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
+            .build();
         log.info("-- {} --, end.", methodName);
+
         super.producerBaseSetup();
     }
 
@@ -217,4 +239,51 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
         log.info("-- {} -- end", methodName);
     }
 
+    // Test sasl server/client auth.
+    @Test
+    public void testSaslServerAndClientAuth() throws Exception {
+        log.info("-- {} -- start", methodName);
+        String hostName = "localhost";
+
+        // prepare client and server side resource
+        AuthenticationDataProvider dataProvider =  authSasl.getAuthData(hostName);
+        AuthenticationProviderSasl saslServer  = (AuthenticationProviderSasl)
+            (pulsar.getBrokerService().getAuthenticationService()
+                .getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME));
+        AuthenticationState authState = saslServer.newAuthState(null, null, null);
+
+        // auth between server and client.
+        // first time auth
+        AuthData initData1 = dataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData serverData1 = authState.authenticate(initData1);
+        boolean complete = authState.isComplete();
+        Assert.assertEquals(complete, false);
+
+        // second time auth, completed
+        AuthData initData2 = dataProvider.authenticate(serverData1);
+        AuthData serverData2 = authState.authenticate(initData2);
+        complete = authState.isComplete();
+        Assert.assertEquals(complete, true);
+        Assert.assertEquals(serverData2.getBytes(), null);
+
+        // if completed, server could not auth again.
+        try {
+            authState.authenticate(initData2);
+            Assert.fail("Expected fail because auth completed for authState");
+        } catch (Exception e) {
+            // expected
+        }
+
+        // another server could not serve old client
+        try {
+            AuthenticationState authState2 = saslServer.newAuthState(null, null, null);
+            AuthData serverData3 = authState2.authenticate(initData1);
+            Assert.fail("Expected fail. server is auth old client data");
+        } catch (Exception e) {
+            // expected
+        }
+
+        log.info("-- {} -- end", methodName);
+    }
+
 }
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslServerTokenSignerTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslServerTokenSignerTest.java
new file mode 100644
index 0000000..f038158
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslServerTokenSignerTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import javax.naming.AuthenticationException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class SaslServerTokenSignerTest {
+
+    @Test
+    public void testNoSecret() throws Exception {
+        try {
+            new SaslRoleTokenSigner(null);
+            Assert.fail();
+        }
+        catch (IllegalArgumentException ex) {
+        }
+    }
+
+    @Test
+    public void testNullAndEmptyString() throws Exception {
+        SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
+        try {
+            signer.sign(null);
+            Assert.fail("Null String should Failed");
+        } catch (IllegalArgumentException ex) {
+            // Expected
+        } catch (Throwable ex) {
+            Assert.fail("Null String should Failed with IllegalArgumentException.");
+        }
+        try {
+            signer.sign("");
+            Assert.fail("Empty String should Failed");
+        } catch (IllegalArgumentException ex) {
+            // Expected
+        } catch (Throwable ex) {
+            Assert.fail("Empty String should Failed with IllegalArgumentException.");
+        }
+    }
+
+    @Test
+    public void testSignature() throws Exception {
+        SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
+        String s1 = signer.sign("ok");
+        String s2 = signer.sign("ok");
+        String s3 = signer.sign("wrong");
+        Assert.assertEquals(s1, s2);
+        Assert.assertNotSame(s1, s3);
+    }
+
+    @Test
+    public void testVerify() throws Exception {
+        SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
+        String t = "test";
+        String s = signer.sign(t);
+        String e = signer.verifyAndExtract(s);
+        Assert.assertEquals(t, e);
+        Assert.assertNotEquals(t, s);
+
+    }
+
+    @Test
+    public void testInvalidSignedText() throws Exception {
+        SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
+        try {
+            signer.verifyAndExtract("test");
+            Assert.fail();
+        } catch (AuthenticationException ex) {
+            // Expected
+        } catch (Throwable ex) {
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void testTampering() throws Exception {
+        SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
+        String t = "test";
+        String s = signer.sign(t);
+        s += "x";
+        try {
+            signer.verifyAndExtract(s);
+            Assert.fail();
+        } catch (AuthenticationException ex) {
+            // Expected
+        } catch (Throwable ex) {
+            Assert.fail();
+        }
+    }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b3262c1..694b824 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -604,16 +604,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String anonymousUserRole = null;
 
-
-    @FieldContext(
-        category = CATEGORY_SASL_AUTH,
-        doc = "Whether Use SASL Authentication or not"
-    )
-    // TODO: isSaslAuthentication used to bypass web resource check.
-    //  will remove it after implementation the support.
-    //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
-    private boolean isSaslAuthentication = false;
-
     @FieldContext(
         category = CATEGORY_SASL_AUTH,
         doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n"
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
index f9a2d03..09bbe42 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
@@ -25,6 +25,9 @@ import java.net.SocketAddress;
 import javax.naming.AuthenticationException;
 
 import javax.net.ssl.SSLSession;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.AuthData;
 
@@ -73,4 +76,11 @@ public interface AuthenticationProvider extends Closeable {
         return new OneStageAuthenticationState(authData, remoteAddress, sslSession, this);
     }
 
+    /**
+     * Set response, according to passed in request.
+     * and return whether we should do following chain.doFilter or not.
+     */
+    default boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
+        throw new AuthenticationException("Not supported");
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
index 4248b6b..5042ef8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
@@ -20,6 +20,8 @@
 package org.apache.pulsar.broker.authentication;
 
 import javax.naming.AuthenticationException;
+import javax.servlet.http.HttpServletRequest;
+
 import org.apache.pulsar.common.api.AuthData;
 
 /**
@@ -47,7 +49,14 @@ public interface AuthenticationState {
     AuthenticationDataSource getAuthDataSource();
 
     /**
-     * Whether the authentication is completed or not
+     * Whether the authentication is completed or not.
      */
     boolean isComplete();
+
+    /**
+     * Get AuthenticationState ID
+     */
+    default long getStateId() {
+        return -1L;
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index c5be5ec..6b4fc8c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pulsar.broker.web;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import java.io.IOException;
 
-import javax.naming.AuthenticationException;
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
@@ -31,8 +32,8 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.common.sasl.SaslConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,27 +52,52 @@ public class AuthenticationFilter implements Filter {
         this.authenticationService = authenticationService;
     }
 
+    private boolean isSaslRequest(HttpServletRequest request) {
+        if (request.getHeader(SaslConstants.SASL_HEADER_TYPE) == null ||
+            request.getHeader(SaslConstants.SASL_HEADER_TYPE).isEmpty()) {
+            return false;
+        }
+        if (request.getHeader(SaslConstants.SASL_HEADER_TYPE)
+            .equalsIgnoreCase(SaslConstants.SASL_TYPE_VALUE)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     @Override
     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
             throws IOException, ServletException {
-
         try {
-            String role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
-            request.setAttribute(AuthenticatedRoleAttributeName, role);
-            request.setAttribute(AuthenticatedDataAttributeName,
+            HttpServletRequest httpRequest = (HttpServletRequest) request;
+            HttpServletResponse httpResponse = (HttpServletResponse) response;
+
+            if (!isSaslRequest(httpRequest)) {
+                // not sasl type, return role directly.
+                String role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
+                request.setAttribute(AuthenticatedRoleAttributeName, role);
+                request.setAttribute(AuthenticatedDataAttributeName,
                     new AuthenticationDataHttps((HttpServletRequest) request));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
+                }
+                chain.doFilter(request, response);
+                return;
+            }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
+            boolean doFilter = authenticationService
+                .getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME)
+                .authenticateHttpRequest(httpRequest, httpResponse);
+
+            if (doFilter) {
+                chain.doFilter(request, response);
             }
-        } catch (AuthenticationException e) {
+        } catch (Exception e) {
             HttpServletResponse httpResponse = (HttpServletResponse) response;
             httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
             LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
             return;
         }
-
-        chain.doFilter(request, response);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ff0c27e..83861b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -226,11 +226,8 @@ public class ServerCnx extends PulsarHandler {
      * - originalPrincipal is not blank
      * - originalPrincipal is not a proxy principal
      */
-    //TODO: for sasl proxy.
-    // github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
     private boolean invalidOriginalPrincipal(String originalPrincipal) {
         return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()
-            && !isSaslAuthenticationMethod()
             && proxyRoles.contains(authRole) && (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal)));
     }
 
@@ -1485,10 +1482,6 @@ public class ServerCnx extends PulsarHandler {
         }
     }
 
-    private boolean isSaslAuthenticationMethod(){
-        return authMethod.equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME);
-    }
-
     private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5b884b7..97eca09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -27,7 +27,6 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -42,6 +41,10 @@ import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -49,7 +52,11 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.common.naming.*;
+import org.apache.pulsar.common.naming.Constants;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -58,12 +65,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
 /**
  * Base class for Web resources in Pulsar. It provides basic authorization functions.
  */
@@ -166,10 +167,7 @@ public abstract class PulsarWebResource {
      *             if not authorized
      */
     protected void validateSuperUserAccess() {
-        // TODO: isSaslAuthentication used to bypass web resource check.
-        //  will remove it after implementation the support.
-        //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
-        if (config().isAuthenticationEnabled() && !config().isSaslAuthentication()) {
+        if (config().isAuthenticationEnabled()) {
             String appId = clientAppId();
             if(log.isDebugEnabled()) {
                 log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),
@@ -245,7 +243,7 @@ public abstract class PulsarWebResource {
             throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
         }
 
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication() && pulsar.getConfiguration().isAuthorizationEnabled()) {
+        if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId)) {
                 throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index ef5c3c6..3ae55bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -133,10 +133,7 @@ public class WebService implements AutoCloseable {
             });
         }
 
-        // TODO: isSaslAuthentication used to bypass web resource check.
-        //  will remove it after implementation the support.
-        //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
-        if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication()) {
+        if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
             FilterHolder filter = new FilterHolder(new AuthenticationFilter(
                                                            pulsar.getBrokerService().getAuthenticationService()));
             context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index bde02d2..2c13985 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.admin.internal;
 
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
@@ -43,6 +45,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -59,17 +63,53 @@ public abstract class BaseResource {
 
     public Builder request(final WebTarget target) throws PulsarAdminException {
         try {
-            Builder builder = target.request(MediaType.APPLICATION_JSON);
-            // Add headers for authentication if any
-            if (auth != null && auth.getAuthData().hasDataForHttp()) {
-                for (Map.Entry<String, String> header : auth.getAuthData().getHttpHeaders()) {
-                    builder.header(header.getKey(), header.getValue());
-                }
+            return requestAsync(target).get();
+        } catch (Exception e) {
+            throw new GettingAuthenticationDataException(e);
+        }
+    }
+
+    // do the authentication stage, and once authentication completed return a Builder
+    public CompletableFuture<Builder> requestAsync(final WebTarget target) {
+        CompletableFuture<Builder> builderFuture = new CompletableFuture<>();
+        CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
+        try {
+            AuthenticationDataProvider authData = auth.getAuthData(target.getUri().getHost());
+
+            if (authData.hasDataForHttp()) {
+                auth.authenticationStage(target.getUri().toString(), authData, null, authFuture);
+            } else {
+                authFuture.complete(null);
             }
-            return builder;
+
+            // auth complete, return a new Builder
+            authFuture.whenComplete((respHeaders, ex) -> {
+                if (ex != null) {
+                    log.warn("[{}] Failed to perform http request at authn stage: {}",
+                        ex.getMessage());
+                    builderFuture.completeExceptionally(new PulsarClientException(ex));
+                    return;
+                }
+
+                try {
+                    Builder builder = target.request(MediaType.APPLICATION_JSON);
+                    if (authData.hasDataForHttp()) {
+                        Set<Entry<String, String>> headers =
+                            auth.newRequestHeader(target.getUri().toString(), authData, respHeaders);
+                        if (headers != null) {
+                            headers.forEach(entry -> builder.header(entry.getKey(), entry.getValue()));
+                        }
+                    }
+                    builderFuture.complete(builder);
+                } catch (Throwable t) {
+                    builderFuture.completeExceptionally(new GettingAuthenticationDataException(t));
+                }
+            });
         } catch (Throwable t) {
-            throw new GettingAuthenticationDataException(t);
+            builderFuture.completeExceptionally(new GettingAuthenticationDataException(t));
         }
+
+        return builderFuture;
     }
 
     public <T> CompletableFuture<Void> asyncPutRequest(final WebTarget target, Entity<T> entity) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
index 36b0b2d..5d91f81 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
@@ -20,9 +20,16 @@ package org.apache.pulsar.client.admin.internal;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.common.sasl.SaslConstants;
 import org.asynchttpclient.RequestBuilder;
 
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.client.WebTarget;
 
 public class ComponentResource extends BaseResource {
 
@@ -30,18 +37,32 @@ public class ComponentResource extends BaseResource {
         super(auth);
     }
 
-    public RequestBuilder addAuthHeaders(RequestBuilder requestBuilder) throws PulsarAdminException {
+    public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder requestBuilder) throws PulsarAdminException {
 
         try {
-            if (auth != null && auth.getAuthData().hasDataForHttp()) {
-                for (Map.Entry<String, String> header : auth.getAuthData().getHttpHeaders()) {
-                    requestBuilder.addHeader(header.getKey(), header.getValue());
+            if (auth != null) {
+                Set<Entry<String, String>> headers = getAuthHeaders(target);
+                if (headers != null && !headers.isEmpty()) {
+                    headers.forEach(header -> requestBuilder.addHeader(header.getKey(), header.getValue()));
                 }
             }
-
             return requestBuilder;
         } catch (Throwable t) {
             throw new PulsarAdminException.GettingAuthenticationDataException(t);
         }
     }
+
+    private Set<Entry<String, String>> getAuthHeaders(WebTarget target) throws Exception {
+        AuthenticationDataProvider authData = auth.getAuthData(target.getUri().getHost());
+        String targetUrl = target.getUri().toString();
+        if (auth.getAuthMethodName().equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME)) {
+            CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
+            auth.authenticationStage(targetUrl, authData, null, authFuture);
+            return auth.newRequestHeader(targetUrl, authData, authFuture.get());
+        } else if (authData.hasDataForHttp()) {
+            return auth.newRequestHeader(targetUrl, authData, null);
+        } else {
+            return null;
+        }
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 2d5d6e5..5cfd13f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -167,7 +167,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 // If the function code is built in, we don't need to submit here
                builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
 
             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
@@ -215,7 +215,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 // If the function code is built in, we don't need to submit here
                 builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
 
             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
@@ -334,7 +334,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                     .addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
                     .addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
 
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
             }
@@ -358,7 +358,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
             RequestBuilder builder = get(target.getUri().toASCIIString());
 
             Future<HttpResponseStatus> whenStatusCode
-                    = asyncHttpClient.executeRequest(addAuthHeaders(builder).build(), new AsyncHandler<HttpResponseStatus>() {
+                    = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(), new AsyncHandler<HttpResponseStatus>() {
                 private HttpResponseStatus status;
 
                 @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 592d879..79e0206 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -125,7 +125,7 @@ public class SinkImpl extends ComponentResource implements Sink {
                 // If the function code is built in, we don't need to submit here
                 builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build()).get();
 
             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
@@ -173,7 +173,7 @@ public class SinkImpl extends ComponentResource implements Sink {
                 // If the function code is built in, we don't need to submit here
                 builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build()).get();
 
             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 5103375..71fc24b 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -125,7 +125,7 @@ public class SourceImpl extends ComponentResource implements Source {
                 // If the function code is built in, we don't need to submit here
                 builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build()).get();
             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
             }
@@ -171,7 +171,7 @@ public class SourceImpl extends ComponentResource implements Source {
                 // If the function code is built in, we don't need to submit here
                 builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build()).get();
 
             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
index 8cb4407..6dd7e3e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.client.api;
 import java.io.Closeable;
 import java.io.Serializable;
 import java.util.Map;
-import javax.naming.AuthenticationException;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 
 /**
@@ -75,4 +78,25 @@ public interface Authentication extends Closeable, Serializable {
      * Initialize the authentication provider
      */
     void start() throws PulsarClientException;
+
+    /**
+     * An authentication Stage.
+     * when authentication complete, passed-in authFuture will contains authentication related http request headers.
+     */
+    default void authenticationStage(String requestUrl,
+                                     AuthenticationDataProvider authData,
+                                     Map<String, String> previousResHeaders,
+                                     CompletableFuture<Map<String, String>> authFuture) {
+        authFuture.complete(null);
+    }
+
+    /**
+     * Add an authenticationStage that will complete along with authFuture
+     */
+    default Set<Entry<String, String>> newRequestHeader(String hostName,
+                                                        AuthenticationDataProvider authData,
+                                                        Map<String, String> previousResHeaders) throws Exception {
+        return authData.getHttpHeaders();
+    }
+
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index 9a0df5d..8f6a4c4 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -88,7 +88,7 @@ public interface AuthenticationDataProvider extends Serializable {
      *
      * @return an enumeration of all the header names
      */
-    default Set<Map.Entry<String, String>> getHttpHeaders() {
+    default Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
         return null;
     }
 
diff --git a/pulsar-client-auth-sasl/pom.xml b/pulsar-client-auth-sasl/pom.xml
index 903fe3c..2879b44 100644
--- a/pulsar-client-auth-sasl/pom.xml
+++ b/pulsar-client-auth-sasl/pom.xml
@@ -57,5 +57,10 @@
       <artifactId>lombok</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>javax.ws.rs</groupId>
+      <artifactId>javax.ws.rs-api</artifactId>
+    </dependency>
+
   </dependencies>
 </project>
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index b8eca39..6707e38 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -19,13 +19,44 @@
 
 package org.apache.pulsar.client.impl.auth;
 
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.common.sasl.SaslConstants.AUTH_METHOD_NAME;
+import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_SECTION_NAME;
+import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN_EXPIRED;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_TOKEN;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_BROKER_PROTOCOL;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_HEADER_STATE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_HEADER_TYPE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_SERVER_TYPE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_CLIENT_INIT;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_COMPLETE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_NEGOTIATE;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER_CHECK_TOKEN;
+import static org.apache.pulsar.common.sasl.SaslConstants.SASL_TYPE_VALUE;
 
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.util.Base64;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import javax.security.auth.login.LoginException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation.Builder;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -33,8 +64,8 @@ import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.AuthenticationUtil;
 import org.apache.pulsar.client.impl.auth.PulsarSaslClient.ClientCallbackHandler;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.sasl.JAASCredentialsContainer;
-import org.apache.pulsar.common.sasl.SaslConstants;
 
 /**
  * Authentication provider for SASL based authentication.
@@ -59,7 +90,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
 
     @Override
     public String getAuthMethodName() {
-        return SaslConstants.AUTH_METHOD_NAME;
+        return AUTH_METHOD_NAME;
     }
 
     @Override
@@ -78,7 +109,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
     public void configure(String encodedAuthParamString) {
         if (isBlank(encodedAuthParamString)) {
             log.info("authParams for SASL is be empty, will use default JAAS client section name: {}",
-                SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+                JAAS_DEFAULT_CLIENT_SECTION_NAME);
         }
 
         try {
@@ -104,9 +135,9 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
 
         // read section from config files of kerberos
         this.loginContextName = authParams
-            .getOrDefault(SaslConstants.JAAS_CLIENT_SECTION_NAME, SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+            .getOrDefault(JAAS_CLIENT_SECTION_NAME, JAAS_DEFAULT_CLIENT_SECTION_NAME);
         this.serverType = authParams
-            .getOrDefault(SaslConstants.SASL_SERVER_TYPE, SaslConstants.SASL_BROKER_PROTOCOL);
+            .getOrDefault(SASL_SERVER_TYPE, SASL_BROKER_PROTOCOL);
 
         // init the static jaasCredentialsContainer that shares amongst client.
         if (!initializedJAAS) {
@@ -130,11 +161,182 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
 
     @Override
     public void start() throws PulsarClientException {
-
+        client = ClientBuilder.newClient();
     }
 
     @Override
     public void close() throws IOException {
+        if(client != null) {
+            client.close();
+        }
+    }
+
+    private String saslRoleToken = null;
+    private Client client = null;
+
+    // role token exists but expired return true
+    private boolean isRoleTokenExpired(Map<String, String> responseHeaders) {
+        if ((saslRoleToken != null)
+            && (responseHeaders != null)
+            // header type match
+            && (responseHeaders.get(SASL_HEADER_TYPE) != null && responseHeaders.get(SASL_HEADER_TYPE)
+                    .equalsIgnoreCase(SASL_TYPE_VALUE))
+            // header state expired
+            && (responseHeaders.get(SASL_HEADER_STATE) != null && responseHeaders.get(SASL_HEADER_STATE)
+                    .equalsIgnoreCase(SASL_AUTH_ROLE_TOKEN_EXPIRED))) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @SneakyThrows(Exception.class)
+    private Builder newRequestBuilder(WebTarget target,
+                                      AuthenticationDataProvider authData,
+                                      Map<String, String> previousResHeaders) {
+        Builder builder = target.request(MediaType.APPLICATION_JSON);
+        Set<Entry<String, String>>  headers = newRequestHeader(
+            target.getUri().toString(),
+            authData,
+            previousResHeaders);
+
+        headers.forEach(entry -> {
+            builder.header(entry.getKey(), entry.getValue());
+        });
+        return builder;
+    }
+
+    // set header according to previous response
+    @Override
+    public Set<Entry<String, String>> newRequestHeader(String hostName,
+                                                       AuthenticationDataProvider authData,
+                                                       Map<String, String> previousRespHeaders) throws Exception {
+
+        Map<String, String> headers = Maps.newHashMap();
+
+        if (authData.hasDataForHttp()) {
+            authData.getHttpHeaders().forEach(header ->
+                headers.put(header.getKey(), header.getValue())
+            );
+        }
+
+        // role token expired in last check. remove role token, new sasl client, restart auth.
+        if (isRoleTokenExpired(previousRespHeaders)) {
+            previousRespHeaders = null;
+            saslRoleToken = null;
+            authData = getAuthData(hostName);
+        }
+
+        // role token is not expired and OK to use.
+        // 1. first time request, send server to check if expired.
+        // 2. server checked, and return SASL_STATE_COMPLETE, ask server to complete auth
+        // 3. server checked, and not return SASL_STATE_COMPLETE
+        if(saslRoleToken != null) {
+            headers.put(SASL_AUTH_ROLE_TOKEN, saslRoleToken);
+            if (previousRespHeaders == null) {
+                // first time auth, ask server to check the role token expired or not.
+                if (log.isDebugEnabled()) {
+                    log.debug("request builder add token: Check token");
+                }
+                headers.put(SASL_HEADER_STATE, SASL_STATE_SERVER_CHECK_TOKEN);
+            } else if (previousRespHeaders.get(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_COMPLETE)) {
+                headers.put(SASL_HEADER_STATE, SASL_STATE_COMPLETE);
+                if (log.isDebugEnabled()) {
+                    log.debug("request builder add token. role verified by server");
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("request builder add token. NOT complete. state: {}",
+                        previousRespHeaders.get(SASL_HEADER_STATE));
+                }
+                headers.put(SASL_HEADER_STATE, SASL_STATE_NEGOTIATE);
+            }
+            return headers.entrySet();
+        }
+
+        // role token is null, need do auth.
+        if (previousRespHeaders == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Init authn in client side");
+            }
+            // first time init
+            headers.put(SASL_HEADER_STATE, SASL_STATE_CLIENT_INIT);
+            AuthData initData = authData.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+            headers.put(SASL_AUTH_TOKEN,
+                Base64.getEncoder().encodeToString(initData.getBytes()));
+        } else {
+            AuthData brokerData = AuthData.of(
+                Base64.getDecoder().decode(
+                    previousRespHeaders.get(SASL_AUTH_TOKEN)));
+            AuthData clientData = authData.authenticate(brokerData);
+
+            headers.put(SASL_STATE_SERVER, previousRespHeaders.get(SASL_STATE_SERVER));
+            headers.put(SASL_HEADER_TYPE, SASL_TYPE_VALUE);
+            headers.put(SASL_HEADER_STATE, SASL_STATE_NEGOTIATE);
+            headers.put(SASL_AUTH_TOKEN,
+                Base64.getEncoder().encodeToString(clientData.getBytes()));
+        }
+
+        return headers.entrySet();
+    }
+
+    private Map<String, String> getHeaders(Response response) {
+        Map<String, String> headers = Maps.newHashMap();
+        String saslHeader = response.getHeaderString(SASL_HEADER_TYPE);
+        String headerState = response.getHeaderString(SASL_HEADER_STATE);
+        String authToken = response.getHeaderString(SASL_AUTH_TOKEN);
+        String serverStateId = response.getHeaderString(SASL_STATE_SERVER);
+
+        if (saslRoleToken != null) {
+            headers.put(SASL_AUTH_ROLE_TOKEN, saslRoleToken);
+        }
+
+        headers.put(SASL_HEADER_TYPE, saslHeader);
+        headers.put(SASL_HEADER_STATE, headerState);
+        headers.put(SASL_AUTH_TOKEN, authToken);
+        headers.put(SASL_STATE_SERVER, serverStateId);
+        return headers;
     }
 
+    @Override
+    public void authenticationStage(String requestUrl,
+                                    AuthenticationDataProvider authData,
+                                    Map<String, String> previousResHeaders,
+                                    CompletableFuture<Map<String, String>> authFuture) {
+        // a new request for sasl auth
+        Builder builder = newRequestBuilder(client.target(requestUrl), authData, previousResHeaders);
+        builder.async().get(new InvocationCallback<Response>() {
+            @Override
+            public void completed(Response response) {
+                if (response.getStatus() == HTTP_UNAUTHORIZED) {
+                    // sasl auth on going
+                    authenticationStage(requestUrl, authData, getHeaders(response), authFuture);
+                    return;
+                }
+
+                if (response.getStatus() != HttpURLConnection.HTTP_OK) {
+                    log.warn("HTTP get request failed: {}", response.getStatusInfo());
+                    authFuture.completeExceptionally(new PulsarClientException("Sasl Auth request failed: " + response.getStatus()));
+                    return;
+                } else {
+                    if (response.getHeaderString(SASL_AUTH_ROLE_TOKEN) != null) {
+                        saslRoleToken = response.getHeaderString(SASL_AUTH_ROLE_TOKEN);
+                    }
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Complete auth with saslRoleToken: {}", saslRoleToken);
+                    }
+                    authFuture.complete(getHeaders(response));
+                    return;
+                }
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                log.warn("Failed to perform http request: {}", throwable);
+                authFuture.completeExceptionally(new PulsarClientException(throwable));
+                return;
+            }
+        });
+    }
 }
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
index 6f38bd7..261e06c 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
@@ -19,12 +19,18 @@
 package org.apache.pulsar.client.impl.auth;
 
 import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.naming.AuthenticationException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.sasl.SaslConstants;
 
 @Slf4j
 public class SaslAuthenticationDataProvider implements AuthenticationDataProvider {
@@ -54,4 +60,17 @@ public class SaslAuthenticationDataProvider implements AuthenticationDataProvide
 
         return pulsarSaslClient.evaluateChallenge(commandData);
     }
+
+    @Override
+    public boolean hasDataForHttp() {
+        return true;
+    }
+
+    @Override
+    public Set<Entry<String, String>> getHttpHeaders() throws Exception {
+        Map<String, String> headers = new HashMap<>();
+        headers.put(SaslConstants.SASL_HEADER_TYPE, SaslConstants.SASL_TYPE_VALUE);
+        return headers.entrySet();
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 91818ab..96c62d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -18,22 +18,20 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.ssl.SslContext;
-
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Map;
-import java.util.Properties;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -41,19 +39,16 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
-import org.asynchttpclient.AsyncCompletionHandler;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.AsyncHttpClientConfig;
 import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
-import org.asynchttpclient.ListenableFuture;
 import org.asynchttpclient.Request;
-import org.asynchttpclient.Response;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+
+@Slf4j
 public class HttpClient implements Closeable {
 
     protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -133,56 +128,76 @@ public class HttpClient implements Closeable {
         final CompletableFuture<T> future = new CompletableFuture<>();
         try {
             String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
-            AuthenticationDataProvider authData = authentication.getAuthData();
-            BoundRequestBuilder builder = httpClient.prepareGet(requestUrl);
+            String remoteHostName = serviceNameResolver.resolveHostUri().getHost();
+            AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);
 
-            // Add headers for authentication if any
+            CompletableFuture<Map<String, String>>  authFuture = new CompletableFuture<>();
+
+            // bring a authenticationStage for sasl auth.
             if (authData.hasDataForHttp()) {
-                for (Map.Entry<String, String> header : authData.getHttpHeaders()) {
-                    builder.setHeader(header.getKey(), header.getValue());
-                }
+                authentication.authenticationStage(requestUrl, authData, null, authFuture);
+            } else {
+                authFuture.complete(null);
             }
 
-            final ListenableFuture<Response> responseFuture = builder.setHeader("Accept", "application/json")
-                    .execute(new AsyncCompletionHandler<Response>() {
+            // auth complete, do real request
+            authFuture.whenComplete((respHeaders, ex) -> {
+                if (ex != null) {
+                    log.warn("[{}] Failed to perform http request at authentication stage: {}",
+                        requestUrl, ex.getMessage());
+                    future.completeExceptionally(new PulsarClientException(ex));
+                    return;
+                }
 
-                        @Override
-                        public Response onCompleted(Response response) throws Exception {
-                            return response;
-                        }
+                // auth complete, use a new builder
+                BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
+                    .setHeader("Accept", "application/json");
+
+                if (authData.hasDataForHttp()) {
+                    Set<Entry<String, String>> headers;
+                    try {
+                        headers = authentication.newRequestHeader(requestUrl, authData, respHeaders);
+                    } catch (Exception e) {
+                        log.warn("[{}] Error during HTTP get headers: {}", requestUrl, e.getMessage());
+                        future.completeExceptionally(new PulsarClientException(e));
+                        return;
+                    }
+                    if (headers != null) {
+                        headers.forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue()));
+                    }
+                }
 
-                        @Override
-                        public void onThrowable(Throwable t) {
-                            log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage());
-                            future.completeExceptionally(new PulsarClientException(t));
-                        }
-                    });
+                builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
+                    if (t != null) {
+                        log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage());
+                        future.completeExceptionally(new PulsarClientException(t));
+                        return;
+                    }
 
-            responseFuture.addListener(() -> {
-                try {
-                    Response response = responseFuture.get();
-                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
-                        log.warn("[{}] HTTP get request failed: {}", requestUrl, response.getStatusText());
+                    // request not success
+                    if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) {
+                        log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText());
                         Exception e;
-                        if (response.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
-                            e = new NotFoundException("Not found: " + response.getStatusText());
+                        if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+                            e = new NotFoundException("Not found: " + response2.getStatusText());
                         } else {
-                            e = new PulsarClientException("HTTP get request failed: " + response.getStatusText());
+                            e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText());
                         }
                         future.completeExceptionally(e);
                         return;
                     }
 
-                    T data = ObjectMapperFactory.getThreadLocal().readValue(response.getResponseBodyAsBytes(), clazz);
-                    future.complete(data);
-                } catch (Exception e) {
-                    log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage());
-                    future.completeExceptionally(new PulsarClientException(e));
-                }
-            }, MoreExecutors.directExecutor());
-
+                    try {
+                        T data = ObjectMapperFactory.getThreadLocal().readValue(response2.getResponseBodyAsBytes(), clazz);
+                        future.complete(data);
+                    } catch (Exception e) {
+                        log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage());
+                        future.completeExceptionally(new PulsarClientException(e));
+                    }
+                });
+            });
         } catch (Exception e) {
-            log.warn("[{}] Failed to get authentication data for lookup: {}", path, e.getMessage());
+            log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
             if (e instanceof PulsarClientException) {
                 future.completeExceptionally(e);
             } else {
@@ -191,8 +206,5 @@ public class HttpClient implements Closeable {
         }
 
         return future;
-
     }
-
-    private static final Logger log = LoggerFactory.getLogger(HttpClient.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
index b2e3c3c..14e3b35 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
@@ -60,6 +60,32 @@ public class SaslConstants {
     // Stand for the start of mutual auth between Client and Broker
     public static final String INIT_PROVIDER_DATA = "isInit";
 
+
+    // Sasl token name that contained auth role
+    public static final String SASL_AUTH_ROLE_TOKEN = "SaslAuthRoleToken";
+    public static final String SASL_AUTH_ROLE_TOKEN_EXPIRED = "SaslAuthRoleTokenExpired";
+
+    /**
+     * HTTP header used by the SASL client/server during an authentication sequence.
+     */
+    // auth type
+    public static final String SASL_HEADER_TYPE = "SASL-Type";
+    public static final String SASL_TYPE_VALUE = "Kerberos";
+
+    // header name for token auth between client and server
+    public static final String SASL_AUTH_TOKEN = "SASL-Token";
+
+    // header name for state
+    public static final String SASL_HEADER_STATE = "State";
+    // header value for state
+    public static final String SASL_STATE_CLIENT_INIT = "Init";
+    public static final String SASL_STATE_NEGOTIATE = "ING";
+    public static final String SASL_STATE_COMPLETE = "Done";
+    public static final String SASL_STATE_SERVER_CHECK_TOKEN = "ServerCheckToken";
+
+    // server side track the server
+    public static final String SASL_STATE_SERVER = "SASL-Server-ID";
+
     public static boolean isUsingTicketCache(String configurationEntry) {
         AppConfigurationEntry[] entries = Configuration.getConfiguration()
             .getAppConfigurationEntry(configurationEntry);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index d91cb0f..79cd436 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -246,10 +246,7 @@ public class WebSocketService implements Closeable {
     public boolean isAuthenticationEnabled() {
         if (this.config == null)
             return false;
-        // TODO: isSaslAuthentication used to bypass web resource check.
-        //  will remove it after implementation the support.
-        //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
-        return this.config.isAuthenticationEnabled() && !this.config.isSaslAuthentication();
+        return this.config.isAuthenticationEnabled();
     }
 
     public boolean isAuthorizationEnabled() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
index e6eb158..10e3664 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
@@ -96,7 +96,7 @@ public class WebSocketWebResource {
      *             if not authorized
      */
     protected void validateSuperUserAccess() {
-        if (service().getConfig().isAuthenticationEnabled() && !service().getConfig().isSaslAuthentication()) {
+        if (service().getConfig().isAuthenticationEnabled()) {
             String appId = clientAppId();
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),