You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/19 15:22:21 UTC

[pulsar] branch master updated: kerberos: authentication between client and broker (#3821)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7064285  kerberos: authentication between client and broker (#3821)
7064285 is described below

commit 7064285c31df1a0352d2d4af84801c312b18c1f7
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Tue Mar 19 23:22:16 2019 +0800

    kerberos: authentication between client and broker (#3821)
    
    Fixes #3652
    
    **Motivation**
    Currently both Zookeeper and BookKeeper could be secured by using Kerberos, It would be good to support Kerberos in Pulsar Broker and Client.
    This is the sub-task for issue in #3491 to support Kerberos in Pulsar Broker and Client.
    Will add proxy and web resource support in following prs.
    The Kerberos authentication is similar to that in Zookeeper and BookKeeper, which leverage SASL and GSSAPI, so reused some of the code there.
    PR #3658 is the first version of PR before #3677 .
    
    **Changes**
    provide both client and broker side support for authentication api;
    add unit test.
---
 conf/broker.conf                                   |  18 +-
 pom.xml                                            |   3 +
 pulsar-broker-auth-sasl/pom.xml                    | 102 +++++
 .../authentication/AuthenticationProviderSasl.java | 113 ++++++
 .../broker/authentication/PulsarSaslServer.java    | 203 ++++++++++
 .../SaslAuthenticationDataSource.java              |  62 +++
 .../authentication/SaslAuthenticationState.java    |  66 ++++
 .../pulsar/broker/authentication/MiniKdc.java      | 347 +++++++++++++++++
 .../authentication/SaslAuthenticateTest.java       | 220 +++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  34 +-
 .../authentication/AuthenticationDataSource.java   |   6 +-
 .../authentication/AuthenticationProvider.java     |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  14 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  13 +-
 .../org/apache/pulsar/broker/web/WebService.java   |   5 +-
 .../client/api/AuthenticationDataProvider.java     |   9 +-
 pulsar-client-auth-sasl/pom.xml                    |  45 +++
 .../client/impl/auth/AuthenticationSasl.java       | 138 +++++++
 .../pulsar/client/impl/auth/PulsarSaslClient.java  | 153 ++++++++
 .../impl/auth/SaslAuthenticationDataProvider.java  |  57 +++
 .../common/sasl/JAASCredentialsContainer.java      | 107 +++++
 .../apache/pulsar/common/sasl/KerberosName.java    | 433 +++++++++++++++++++++
 .../apache/pulsar/common/sasl/SaslConstants.java   |  96 +++++
 .../pulsar/common/sasl/TGTRefreshThread.java       | 274 +++++++++++++
 .../apache/pulsar/websocket/WebSocketService.java  |   5 +-
 .../websocket/admin/WebSocketWebResource.java      |   2 +-
 26 files changed, 2506 insertions(+), 21 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 0d17785..fb5e1e8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -251,7 +251,7 @@ authenticateOriginalAuthData=false
 # Deprecated - Use webServicePortTls and brokerServicePortTls instead
 tlsEnabled=false
 
-# Tls cert refresh duration in seconds (set 0 to check on every new connection) 
+# Tls cert refresh duration in seconds (set 0 to check on every new connection)
 tlsCertRefreshCheckDurationSec=300
 
 # Path for the TLS certificate file
@@ -343,6 +343,22 @@ tokenPublicKey=
 # The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank)
 tokenAuthClaim=
 
+### --- SASL Authentication Provider --- ###
+
+# Whether Use SASL Authentication or not.
+# TODO: used to bypass web resource check. will remove it after implementation the support.
+# github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+isSaslAuthentication=
+
+# This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.
+# Default value: `SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT`, which is ".*pulsar.*",
+# so only clients whose id contains 'pulsar' are allowed to connect.
+saslJaasClientAllowedIds=
+
+# Service Principal, for login context name.
+# Default value `SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME`, which is "Broker".
+saslJaasBrokerSectionName=
+
 ### --- BookKeeper Client --- ###
 
 # Authentication plugin to use when connecting to bookies
diff --git a/pom.xml b/pom.xml
index 80ab4c1..f3731b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,8 @@ flexible messaging model and an intuitive client API.</description>
     <module>pulsar-log4j2-appender</module>
     <module>pulsar-sql</module>
     <module>dashboard</module>
+    <module>pulsar-broker-auth-sasl</module>
+    <module>pulsar-client-auth-sasl</module>
 
     <!-- jclouds shaded for gson conflict: https://issues.apache.org/jira/browse/JCLOUDS-1166 -->
     <module>jclouds-shaded</module>
@@ -196,6 +198,7 @@ flexible messaging model and an intuitive client API.</description>
     <cassandra.version>3.6.0</cassandra.version>
     <disruptor.version>3.4.0</disruptor.version>
     <testcontainers.version>1.10.5</testcontainers.version>
+    <kerby.version>1.1.1</kerby.version>
 
     <!-- Plugin dependencies -->
     <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
diff --git a/pulsar-broker-auth-sasl/pom.xml b/pulsar-broker-auth-sasl/pom.xml
new file mode 100644
index 0000000..cc19661
--- /dev/null
+++ b/pulsar-broker-auth-sasl/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-broker-auth-sasl</artifactId>
+  <packaging>jar</packaging>
+  <description>SASL authentication plugin for broker</description>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kerby</groupId>
+      <artifactId>kerby-config</artifactId>
+      <version>${kerby.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kerby</groupId>
+      <artifactId>kerb-simplekdc</artifactId>
+      <version>${kerby.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-zookeeper-utils</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-auth-sasl</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
new file mode 100644
index 0000000..d11a0e1
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_BROKER_SECTION_NAME;
+import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
+import static org.apache.pulsar.common.sasl.SaslConstants.KINIT_COMMAND;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import javax.security.auth.login.LoginException;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.sasl.JAASCredentialsContainer;
+import org.apache.pulsar.common.sasl.SaslConstants;
+
+@Slf4j
+public class AuthenticationProviderSasl implements AuthenticationProvider {
+
+    private Pattern allowedIdsPattern;
+    private Map<String, String> configuration;
+
+    private JAASCredentialsContainer jaasCredentialsContainer;
+    private String loginContextName;
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        this.configuration = Maps.newHashMap();
+        final String allowedIdsPatternRegExp = config.getSaslJaasClientAllowedIds();
+        configuration.put(JAAS_CLIENT_ALLOWED_IDS, allowedIdsPatternRegExp);
+        configuration.put(JAAS_BROKER_SECTION_NAME, config.getSaslJaasBrokerSectionName());
+        configuration.put(KINIT_COMMAND, config.getKinitCommand());
+
+        try {
+            this.allowedIdsPattern = Pattern.compile(allowedIdsPatternRegExp);
+        } catch (PatternSyntaxException error) {
+            log.error("Invalid regular expression for id " + allowedIdsPatternRegExp, error);
+            throw new IOException(error);
+        }
+
+        loginContextName = config.getSaslJaasBrokerSectionName();
+        if (jaasCredentialsContainer == null) {
+            log.info("JAAS loginContext is: {}." , loginContextName);
+            try {
+                jaasCredentialsContainer = new JAASCredentialsContainer(
+                    loginContextName,
+                    new PulsarSaslServer.SaslServerCallbackHandler(allowedIdsPattern),
+                    configuration);
+            } catch (LoginException e) {
+                log.error("JAAS login in broker failed: {}" , e);
+                throw new IOException(e);
+            }
+        }
+    }
+
+    @Override
+    public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+        if (authData instanceof SaslAuthenticationDataSource) {
+            return ((SaslAuthenticationDataSource)authData).getAuthorizationID();
+        } else {
+            throw new AuthenticationException("Not support authDataSource type, expect sasl.");
+        }
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return SaslConstants.AUTH_METHOD_NAME;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public AuthenticationState newAuthState(AuthData authData,
+                                            SocketAddress remoteAddress,
+                                            SSLSession sslSession) throws AuthenticationException {
+        try {
+            new PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern);
+            return new SaslAuthenticationState(
+                new SaslAuthenticationDataSource(
+                    new PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern)));
+        } catch (Throwable t) {
+            log.error("Failed create sasl auth state: {}" , t);
+            throw new AuthenticationException(t.getMessage());
+        }
+    }
+}
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
new file mode 100644
index 0000000..f7e7eb9
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.regex.Pattern;
+
+import javax.naming.AuthenticationException;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.sasl.KerberosName;
+import org.apache.pulsar.common.sasl.SaslConstants;
+
+
+/**
+ * Server side Sasl implementation.
+ */
+@Slf4j
+public class PulsarSaslServer {
+
+    private final SaslServer saslServer;
+    private final Pattern allowedIdsPattern;
+    private final Subject serverSubject;
+    private static final String GSSAPI = "GSSAPI";
+
+    public PulsarSaslServer(Subject subject, Pattern allowedIdsPattern)
+        throws IOException, LoginException {
+        this.serverSubject = subject;
+        this.allowedIdsPattern = allowedIdsPattern;
+        saslServer = createSaslServer(serverSubject);
+    }
+
+    private SaslServer createSaslServer(final Subject subject)
+        throws IOException {
+        SaslServerCallbackHandler callbackHandler = new SaslServerCallbackHandler(allowedIdsPattern);
+        if (subject.getPrincipals().size() > 0) {
+            try {
+                final Object[] principals = subject.getPrincipals().toArray();
+                final Principal servicePrincipal = (Principal) principals[0];
+                if (log.isDebugEnabled()) {
+                    log.debug("Authentication will use SASL/JAAS/Kerberos, servicePrincipal is {}", servicePrincipal);
+                }
+
+                // e.g. servicePrincipalNameAndHostname := "broker/myhost.foo.com@EXAMPLE.COM"
+                final String servicePrincipalNameAndHostname = servicePrincipal.getName();
+                int indexOf = servicePrincipalNameAndHostname.indexOf("/");
+
+                // e.g. serviceHostnameAndKerbDomain := "myhost.foo.com@EXAMPLE.COM"
+                final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf + 1,
+                    servicePrincipalNameAndHostname.length());
+                int indexOfAt = serviceHostnameAndKerbDomain.indexOf("@");
+
+                // Handle Kerberos Service as well as User Principal Names
+                final String servicePrincipalName, serviceHostname;
+                if (indexOf > 0) {
+                    // e.g. servicePrincipalName := "pulsar"
+                    servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOf);
+                    // e.g. serviceHostname := "myhost.foo.com"
+                    serviceHostname = serviceHostnameAndKerbDomain.substring(0, indexOfAt);
+                } else {
+                    servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOfAt);
+                    serviceHostname = null;
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("serviceHostname is '{}', servicePrincipalName is '{}', SASL mechanism(mech) is '{}'.",
+                        serviceHostname, servicePrincipalName, GSSAPI);
+                }
+
+                try {
+                    return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+                            @Override
+                            public SaslServer run() {
+                                try {
+                                    SaslServer saslServer;
+                                    saslServer = Sasl.createSaslServer(GSSAPI, servicePrincipalName, serviceHostname,
+                                        null, callbackHandler);
+                                    return saslServer;
+                                } catch (SaslException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                        }
+                    );
+                } catch (PrivilegedActionException e) {
+                    throw new SaslException("error on GSSAPI boot", e.getCause());
+                }
+            } catch (IndexOutOfBoundsException e) {
+                throw new SaslException("error on GSSAPI boot", e);
+            }
+        } else {
+            String errorMessage = "Authentication use SASL/JAAS/GSSAPI but server not have Principals";
+            log.error(errorMessage);
+            throw new SaslException(errorMessage);
+        }
+    }
+
+    public boolean isComplete() {
+        return saslServer.isComplete();
+    }
+
+    /**
+     * Reports the authorization ID in effect for the client of this
+     * session.
+     * This method can only be called if isComplete() returns true.
+     * @return The authorization ID of the client.
+     * @exception IllegalStateException if this authentication session has not completed
+     */
+    public String getAuthorizationID() throws IllegalStateException {
+        return saslServer.getAuthorizationID();
+    }
+
+    public AuthData response(AuthData token) throws AuthenticationException {
+        try {
+            return AuthData.of(saslServer.evaluateResponse(token.getBytes()));
+        } catch (SaslException e) {
+            log.error("response: Failed to evaluate client token:", e);
+            throw new AuthenticationException(e.getMessage());
+        }
+    }
+
+    static class SaslServerCallbackHandler implements CallbackHandler {
+        Pattern allowedIdsPattern;
+
+        public SaslServerCallbackHandler(Pattern pattern) {
+            this.allowedIdsPattern = pattern;
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof AuthorizeCallback) {
+                    handleAuthorizeCallback((AuthorizeCallback) callback);
+                } else {
+                    throw new UnsupportedCallbackException(callback, "Unrecognized SASL GSSAPI Server Callback.");
+                }
+            }
+        }
+
+        private void handleAuthorizeCallback(AuthorizeCallback ac) {
+            String authenticationID = ac.getAuthenticationID();
+            String authorizationID = ac.getAuthorizationID();
+            if (!authenticationID.equals(authorizationID)) {
+                ac.setAuthorized(false);
+                log.info("Forbidden access to client: authenticationID: {} is different from authorizationID: {}",
+                    authenticationID, authorizationID);
+                return;
+            }
+            if (!allowedIdsPattern.matcher(authenticationID).matches()) {
+                ac.setAuthorized(false);
+                log.info("Forbidden access to client: authenticationID {}, is not allowed (see {} property).",
+                    authenticationID, SaslConstants.JAAS_CLIENT_ALLOWED_IDS);
+                return;
+            }
+
+            ac.setAuthorized(true);
+            log.info("Successfully authenticated client: authenticationID: {};  authorizationID: {}.",
+                authenticationID, authorizationID);
+
+            KerberosName kerberosName = new KerberosName(authenticationID);
+            try {
+                StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
+                userNameBuilder.append("/").append(kerberosName.getHostName());
+                userNameBuilder.append("@").append(kerberosName.getRealm());
+
+                log.info("Setting authorizedID: {} ", userNameBuilder);
+                ac.setAuthorizedID(userNameBuilder.toString());
+            } catch (IOException e) {
+                log.error("Failed to set name based on Kerberos authentication rules.");
+            }
+        }
+    }
+}
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationDataSource.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationDataSource.java
new file mode 100644
index 0000000..2fe6546
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationDataSource.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import javax.naming.AuthenticationException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.api.AuthData;
+
+@Slf4j
+public class SaslAuthenticationDataSource implements AuthenticationDataSource {
+    private static final long serialVersionUID = 1L;
+
+    // server side token data, that will passed to sasl client side.
+    protected AuthData serverSideToken;
+    private PulsarSaslServer pulsarSaslServer;
+
+    public SaslAuthenticationDataSource(PulsarSaslServer saslServer) {
+        this.pulsarSaslServer = saslServer;
+    }
+
+    @Override
+    public boolean hasDataFromCommand() {
+        return true;
+    }
+
+    @Override
+    public AuthData authenticate(AuthData data) throws AuthenticationException {
+        serverSideToken = pulsarSaslServer.response(data);
+        return serverSideToken;
+    }
+
+    public boolean isComplete() {
+        return this.pulsarSaslServer.isComplete();
+    }
+
+    public String getAuthorizationID() {
+        return pulsarSaslServer.getAuthorizationID();
+    }
+
+    // TODO: for http support. github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+    /* default boolean hasDataFromHttp() {
+        return false;
+    }*/
+
+}
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationState.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationState.java
new file mode 100644
index 0000000..d345d81
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/SaslAuthenticationState.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.pulsar.common.api.AuthData;
+
+/**
+ * Interface for authentication state.
+ *
+ * It is basically holding the the authentication state.
+ * It tell broker whether the authentication is completed or not,
+ */
+public class SaslAuthenticationState implements AuthenticationState {
+    private SaslAuthenticationDataSource authenticationDataSource;
+
+    public SaslAuthenticationState(AuthenticationDataSource authenticationDataSource) {
+        checkArgument(authenticationDataSource instanceof SaslAuthenticationDataSource);
+        this.authenticationDataSource = (SaslAuthenticationDataSource)authenticationDataSource;
+    }
+
+    @Override
+    public String getAuthRole() {
+        return authenticationDataSource.getAuthorizationID();
+    }
+
+    @Override
+    public AuthenticationDataSource getAuthDataSource() {
+        return authenticationDataSource;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return authenticationDataSource.isComplete();
+    }
+
+    /**
+     * Returns null if authentication has completed, and no auth data is required to send back to client.
+     * Do auth and Returns the auth data back to client, if authentication has not completed.
+     */
+    public AuthData authenticate(AuthData authData) throws AuthenticationException {
+        return authenticationDataSource.authenticate(authData);
+    }
+
+
+}
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/MiniKdc.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/MiniKdc.java
new file mode 100644
index 0000000..1ec60fc
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/MiniKdc.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kerby.kerberos.kerb.KrbException;
+import org.apache.kerby.kerberos.kerb.server.KdcConfigKey;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+import org.apache.kerby.util.IOUtil;
+import org.apache.kerby.util.NetworkUtil;
+
+/**
+ * Mini KDC based on Apache Directory Server that can be embedded in testcases
+ * or used from command line as a standalone KDC.
+ *
+ * <p><b>From within testcases:</b>
+ *
+ * <p>MiniKdc sets one System property when started and un-set when stopped:
+ * <ul>
+ *   <li>sun.security.krb5.debug: set to the debug value provided in the
+ *   configuration</li>
+ * </ul>
+ * Because of this, multiple MiniKdc instances cannot be started in parallel.
+ * For example, running testcases in parallel that start a KDC each. To
+ * accomplish this a single MiniKdc should be used for all testcases running
+ * in parallel.
+ *
+ * <p>MiniKdc default configuration values are:
+ * <ul>
+ *   <li>org.name=EXAMPLE (used to create the REALM)</li>
+ *   <li>org.domain=COM (used to create the REALM)</li>
+ *   <li>kdc.bind.address=localhost</li>
+ *   <li>kdc.port=0 (ephemeral port)</li>
+ *   <li>instance=DefaultKrbServer</li>
+ *   <li>max.ticket.lifetime=86400000 (1 day)</li>
+ *   <li>max.renewable.lifetime=604800000 (7 days)</li>
+ *   <li>transport=TCP</li>
+ *   <li>debug=false</li>
+ * </ul>
+ * The generated krb5.conf forces TCP connections.
+ * This code is originally from HDFS, see the file name MiniKdc there
+ * in case of bug fixing, history, etc.
+ * https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java
+ */
+@Slf4j
+public class MiniKdc {
+
+    public static final String JAVA_SECURITY_KRB5_CONF =
+            "java.security.krb5.conf";
+    public static final String SUN_SECURITY_KRB5_DEBUG =
+            "sun.security.krb5.debug";
+
+    public static final String ORG_NAME = "org.name";
+    public static final String ORG_DOMAIN = "org.domain";
+    public static final String KDC_BIND_ADDRESS = "kdc.bind.address";
+    public static final String KDC_PORT = "kdc.port";
+    public static final String INSTANCE = "instance";
+    public static final String MAX_TICKET_LIFETIME = "max.ticket.lifetime";
+    public static final String MAX_RENEWABLE_LIFETIME = "max.renewable.lifetime";
+    public static final String TRANSPORT = "transport";
+    public static final String DEBUG = "debug";
+
+    private static final Set<String> PROPERTIES = new HashSet<String>();
+    private static final Properties DEFAULT_CONFIG = new Properties();
+
+    static {
+        PROPERTIES.add(ORG_NAME);
+        PROPERTIES.add(ORG_DOMAIN);
+        PROPERTIES.add(KDC_BIND_ADDRESS);
+        PROPERTIES.add(KDC_BIND_ADDRESS);
+        PROPERTIES.add(KDC_PORT);
+        PROPERTIES.add(INSTANCE);
+        PROPERTIES.add(TRANSPORT);
+        PROPERTIES.add(MAX_TICKET_LIFETIME);
+        PROPERTIES.add(MAX_RENEWABLE_LIFETIME);
+
+        DEFAULT_CONFIG.setProperty(KDC_BIND_ADDRESS, "localhost");
+        DEFAULT_CONFIG.setProperty(KDC_PORT, "0");
+        DEFAULT_CONFIG.setProperty(INSTANCE, "DefaultKrbServer");
+        DEFAULT_CONFIG.setProperty(ORG_NAME, "EXAMPLE");
+        DEFAULT_CONFIG.setProperty(ORG_DOMAIN, "COM");
+        DEFAULT_CONFIG.setProperty(TRANSPORT, "TCP");
+        DEFAULT_CONFIG.setProperty(MAX_TICKET_LIFETIME, "86400000");
+        DEFAULT_CONFIG.setProperty(MAX_RENEWABLE_LIFETIME, "604800000");
+        DEFAULT_CONFIG.setProperty(DEBUG, "false");
+    }
+
+    /**
+     * Convenience method that returns MiniKdc default configuration.
+     *
+     * <p>The returned configuration is a copy, it can be customized before using
+     * it to create a MiniKdc.
+     * @return a MiniKdc default configuration.
+     */
+    public static Properties createConf() {
+        return (Properties) DEFAULT_CONFIG.clone();
+    }
+
+    private Properties conf;
+    private SimpleKdcServer simpleKdc;
+    private int port;
+    private String realm;
+    private File workDir;
+    private File krb5conf;
+    private String transport;
+    private boolean krb5Debug;
+
+    public void setTransport(String transport) {
+        this.transport = transport;
+    }
+
+    /**
+     * Creates a MiniKdc.
+     *
+     * @param conf MiniKdc configuration.
+     * @param workDir working directory, it should be the build directory. Under
+     * this directory an ApacheDS working directory will be created, this
+     * directory will be deleted when the MiniKdc stops.
+     * @throws Exception thrown if the MiniKdc could not be created.
+     */
+    public MiniKdc(Properties conf, File workDir) throws Exception {
+        if (!conf.keySet().containsAll(PROPERTIES)) {
+            Set<String> missingProperties = new HashSet<String>(PROPERTIES);
+            missingProperties.removeAll(conf.keySet());
+            throw new IllegalArgumentException("Missing configuration properties: "
+                    + missingProperties);
+        }
+        this.workDir = new File(workDir, Long.toString(System.currentTimeMillis()));
+        if (!this.workDir.exists()
+                && !this.workDir.mkdirs()) {
+            throw new RuntimeException("Cannot create directory " + this.workDir);
+        }
+        log.info("Configuration:");
+        log.info("---------------------------------------------------------------");
+        for (Map.Entry<?, ?> entry : conf.entrySet()) {
+            log.info("  {}: {}", entry.getKey(), entry.getValue());
+        }
+        log.info("---------------------------------------------------------------");
+        this.conf = conf;
+        port = Integer.parseInt(conf.getProperty(KDC_PORT));
+        String orgName = conf.getProperty(ORG_NAME);
+        String orgDomain = conf.getProperty(ORG_DOMAIN);
+        realm = orgName.toUpperCase(Locale.ENGLISH) + "."
+                + orgDomain.toUpperCase(Locale.ENGLISH);
+    }
+
+    /**
+     * Returns the port of the MiniKdc.
+     *
+     * @return the port of the MiniKdc.
+     */
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * Returns the host of the MiniKdc.
+     *
+     * @return the host of the MiniKdc.
+     */
+    public String getHost() {
+        return conf.getProperty(KDC_BIND_ADDRESS);
+    }
+
+    /**
+     * Returns the realm of the MiniKdc.
+     *
+     * @return the realm of the MiniKdc.
+     */
+    public String getRealm() {
+        return realm;
+    }
+
+    public File getKrb5conf() {
+        krb5conf = new File(System.getProperty(JAVA_SECURITY_KRB5_CONF));
+        return krb5conf;
+    }
+
+    /**
+     * Starts the MiniKdc.
+     *
+     * @throws Exception thrown if the MiniKdc could not be started.
+     */
+    public synchronized void start() throws Exception {
+        if (simpleKdc != null) {
+            throw new RuntimeException("Already started");
+        }
+        simpleKdc = new SimpleKdcServer();
+        prepareKdcServer();
+        simpleKdc.init();
+        resetDefaultRealm();
+        simpleKdc.start();
+        log.info("MiniKdc stated.");
+    }
+
+    private void resetDefaultRealm() throws IOException {
+        InputStream templateResource = new FileInputStream(
+                getKrb5conf().getAbsolutePath());
+        String content = IOUtil.readInput(templateResource);
+        content = content.replaceAll("default_realm = .*\n",
+                "default_realm = " + getRealm() + "\n");
+        IOUtil.writeFile(content, getKrb5conf());
+    }
+
+    private void prepareKdcServer() throws Exception {
+        // transport
+        simpleKdc.setWorkDir(workDir);
+        simpleKdc.setKdcHost(getHost());
+        simpleKdc.setKdcRealm(realm);
+        if (transport == null) {
+            transport = conf.getProperty(TRANSPORT);
+        }
+        if (port == 0) {
+            port = NetworkUtil.getServerPort();
+        }
+        if (transport != null) {
+            if (transport.trim().equals("TCP")) {
+                simpleKdc.setKdcTcpPort(port);
+                simpleKdc.setAllowUdp(false);
+            } else if (transport.trim().equals("UDP")) {
+                simpleKdc.setKdcUdpPort(port);
+                simpleKdc.setAllowTcp(false);
+            } else {
+                throw new IllegalArgumentException("Invalid transport: " + transport);
+            }
+        } else {
+            throw new IllegalArgumentException("Need to set transport!");
+        }
+        simpleKdc.getKdcConfig().setString(KdcConfigKey.KDC_SERVICE_NAME,
+                conf.getProperty(INSTANCE));
+        if (conf.getProperty(DEBUG) != null) {
+            krb5Debug = getAndSet(SUN_SECURITY_KRB5_DEBUG, conf.getProperty(DEBUG));
+        }
+    }
+
+    /**
+     * Stops the MiniKdc.
+     */
+    public synchronized void stop() {
+        if (simpleKdc != null) {
+            try {
+                simpleKdc.stop();
+            } catch (KrbException e) {
+                e.printStackTrace();
+            } finally {
+                if (conf.getProperty(DEBUG) != null) {
+                    System.setProperty(SUN_SECURITY_KRB5_DEBUG,
+                            Boolean.toString(krb5Debug));
+                }
+            }
+        }
+        delete(workDir);
+        try {
+            // Will be fixed in next Kerby version.
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        log.info("MiniKdc stopped.");
+    }
+
+    private void delete(File f) {
+        if (f.isFile()) {
+            if (!f.delete()) {
+                log.warn("WARNING: cannot delete file " + f.getAbsolutePath());
+            }
+        } else {
+            for (File c: f.listFiles()) {
+                delete(c);
+            }
+            if (!f.delete()) {
+                log.warn("WARNING: cannot delete directory " + f.getAbsolutePath());
+            }
+        }
+    }
+
+    /**
+     * Creates a principal in the KDC with the specified user and password.
+     *
+     * @param principal principal name, do not include the domain.
+     * @param password password.
+     * @throws Exception thrown if the principal could not be created.
+     */
+    public synchronized void createPrincipal(String principal, String password)
+            throws Exception {
+        simpleKdc.createPrincipal(principal, password);
+    }
+
+    /**
+     * Creates multiple principals in the KDC and adds them to a keytab file.
+     *
+     * @param keytabFile keytab file to add the created principals.
+     * @param principals principals to add to the KDC, do not include the domain.
+     * @throws Exception thrown if the principals or the keytab file could not be
+     * created.
+     */
+    public synchronized void createPrincipal(File keytabFile,
+                                             String ... principals)
+            throws Exception {
+        simpleKdc.createPrincipals(principals);
+        if (keytabFile.exists() && !keytabFile.delete()) {
+            log.error("Failed to delete keytab file: " + keytabFile);
+        }
+        for (String principal : principals) {
+            simpleKdc.getKadmin().exportKeytab(keytabFile, principal);
+        }
+    }
+
+    /**
+     * Set the System property; return the old value for caching.
+     *
+     * @param sysprop property
+     * @param debug true or false
+     * @return the previous value
+     */
+    private boolean getAndSet(String sysprop, String debug) {
+        boolean old = Boolean.getBoolean(sysprop);
+        System.setProperty(sysprop, debug);
+        return old;
+    }
+}
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
new file mode 100644
index 0000000..61d2b04
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.FileWriter;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.login.Configuration;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class SaslAuthenticateTest extends ProducerConsumerBase {
+    public static File kdcDir;
+    public static File kerberosWorkDir;
+
+    private static MiniKdc kdc;
+    private static Properties properties;
+
+    private static String localHostname = "localhost";
+    private static Authentication authSasl;
+
+    @BeforeClass
+    public static void startMiniKdc() throws Exception {
+        kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
+        kerberosWorkDir = Files.createTempDirectory("test-kerberos-work-dir").toFile();
+
+        properties = MiniKdc.createConf();
+        kdc = new MiniKdc(properties, kdcDir);
+        kdc.start();
+
+        String principalServerNoRealm = "broker/" + localHostname;
+        String principalServer = "broker/" + localHostname + "@" + kdc.getRealm();
+        log.info("principalServer: " + principalServer);
+        String principalClientNoRealm = "client/" + localHostname;
+        String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
+        log.info("principalClient: " + principalClient);
+
+        File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
+        kdc.createPrincipal(keytabClient, principalClientNoRealm);
+
+        File keytabServer = new File(kerberosWorkDir, "pulsarbroker.keytab");
+        kdc.createPrincipal(keytabServer, principalServerNoRealm);
+
+        File jaasFile = new File(kerberosWorkDir, "jaas.properties");
+        try (FileWriter writer = new FileWriter(jaasFile)) {
+            writer.write("\n"
+                + "PulsarBroker {\n"
+                + "  com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+                + "  useKeyTab=true\n"
+                + "  keyTab=\"" + keytabServer.getAbsolutePath() + "\n"
+                + "  storeKey=true\n"
+                + "  useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
+                + "  principal=\"" + principalServer + "\";\n"
+                + "};\n"
+                + "\n"
+                + "\n"
+                + "\n"
+                + "PulsarClient {\n"
+                + "  com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+                + "  useKeyTab=true\n"
+                + "  keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
+                + "  storeKey=true\n"
+                + "  useTicketCache=false\n"
+                + "  principal=\"" + principalClient + "\";\n"
+                + "};\n"
+            );
+        }
+
+        File krb5file = new File(kerberosWorkDir, "krb5.properties");
+        try (FileWriter writer = new FileWriter(krb5file)) {
+            String conf = "[libdefaults]\n"
+                + " default_realm = " + kdc.getRealm() + "\n"
+                + " udp_preference_limit = 1\n" // force use TCP
+                + "\n"
+                + "\n"
+                + "[realms]\n"
+                + " " + kdc.getRealm() + "  = {\n"
+                + "  kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+                + " }";
+            writer.write(conf);
+            log.info("krb5.properties:\n" + conf);
+        }
+
+        System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
+        System.setProperty("java.security.krb5.properties", krb5file.getAbsolutePath());
+        Configuration.getConfiguration().refresh();
+
+        // Client config
+        Map<String, String> clientSaslConfig = Maps.newHashMap();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        log.info("set client jaas section name: PulsarClient");
+        authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);
+        log.info("created AuthenticationSasl");
+    }
+
+    @AfterClass
+    public static void stopMiniKdc() {
+        System.clearProperty("java.security.auth.login.config");
+        System.clearProperty("java.security.krb5.properties");
+        if (kdc != null) {
+            kdc.stop();
+        }
+        FileUtils.deleteQuietly(kdcDir);
+        FileUtils.deleteQuietly(kerberosWorkDir);
+        Assert.assertFalse(kdcDir.exists());
+        Assert.assertFalse(kerberosWorkDir.exists());
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        log.info("-- {} --, start at host: {}", methodName, localHostname);
+        isTcpLookup = true;
+        conf.setAdvertisedAddress(localHostname);
+        conf.setAuthenticationEnabled(true);
+        conf.setSaslAuthentication(true);
+        conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+        conf.setSaslJaasBrokerSectionName("PulsarBroker");
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderSasl.class.getName());
+        conf.setAuthenticationProviders(providers);
+        conf.setClusterName("test");
+
+        super.init();
+
+        lookupUrl = new URI("broker://" + "localhost" + ":" + BROKER_PORT);
+        pulsarClient = PulsarClient.builder()
+            .serviceUrl(lookupUrl.toString())
+            .statsInterval(0, TimeUnit.SECONDS)
+            .authentication(authSasl).build();
+
+        log.info("-- {} --, end.", methodName);
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    // Test could verify with kerberos configured.
+    @Test
+    public void testProducerAndConsumerPassed() throws Exception {
+        log.info("-- {} -- start", methodName);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topic("persistent://my-property/my-ns/my-topic")
+            .subscriptionName("my-subscriber-name")
+            .subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+            .topic("persistent://my-property/my-ns/my-topic")
+            .enableBatching(false);
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            log.info("Produced message: [{}]", message);
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.info("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        log.info("-- {} -- end", methodName);
+    }
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0fbe155..da2b098 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.configuration.Category;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.sasl.SaslConstants;
 
 /**
  * Pulsar service configuration object.
@@ -76,6 +77,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @Category
     private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider";
     @Category
+    private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
+    @Category
     private static final String CATEGORY_HTTP = "HTTP";
 
     /***** --- pulsar configuration --- ****/
@@ -590,6 +593,35 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String anonymousUserRole = null;
 
+
+    @FieldContext(
+        category = CATEGORY_SASL_AUTH,
+        doc = "Whether Use SASL Authentication or not"
+    )
+    // TODO: isSaslAuthentication used to bypass web resource check.
+    //  will remove it after implementation the support.
+    //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+    private boolean isSaslAuthentication = false;
+
+    @FieldContext(
+        category = CATEGORY_SASL_AUTH,
+        doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n"
+            + " Default value is: \".*pulsar.*\", so only clients whose id contains 'pulsar' are allowed to connect."
+    )
+    private String saslJaasClientAllowedIds = SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT;
+
+    @FieldContext(
+        category = CATEGORY_SASL_AUTH,
+        doc = "Service Principal, for login context name. Default value is \"Broker\"."
+    )
+    private String saslJaasBrokerSectionName = SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;
+
+    @FieldContext(
+        category = CATEGORY_SASL_AUTH,
+        doc = "kerberos kinit command."
+    )
+    private String kinitCommand = "/usr/bin/kinit";
+
     /**** --- BookKeeper Client --- ****/
     @FieldContext(
         category = CATEGORY_STORAGE_BK,
@@ -1202,4 +1234,4 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public Optional<Integer> getWebServicePortTls() {
         return Optional.ofNullable(webServicePortTls);
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
index fcc6dda..b72b99b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.broker.authentication;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.cert.Certificate;
+import javax.naming.AuthenticationException;
 import org.apache.pulsar.common.api.AuthData;
 
 /**
@@ -103,8 +103,8 @@ public interface AuthenticationDataSource {
      * Evaluate and challenge the data that passed in, and return processed data back.
      * It is used for mutual authentication like SASL.
      */
-    default AuthData authenticate(AuthData data) throws IOException {
-        throw new UnsupportedOperationException();
+    default AuthData authenticate(AuthData data) throws AuthenticationException {
+        throw new AuthenticationException("Not supported");
     }
 
     /*
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
index 755fe86..f9a2d03 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
@@ -69,7 +69,7 @@ public interface AuthenticationProvider extends Closeable {
     default AuthenticationState newAuthState(AuthData authData,
                                              SocketAddress remoteAddress,
                                              SSLSession sslSession)
-        throws AuthenticationException{
+        throws AuthenticationException {
         return new OneStageAuthenticationState(authData, remoteAddress, sslSession, this);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bcd739f..abed070 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,6 +100,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.sasl.SaslConstants;
 import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -220,14 +221,17 @@ public class ServerCnx extends PulsarHandler {
     }
 
     /*
-     * If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce
+     * If authentication and authorization is enabled(and not sasl) and if the authRole is one of proxyRoles we want to enforce
      * - the originalPrincipal is given while connecting
      * - originalPrincipal is not blank
      * - originalPrincipal is not a proxy principal
      */
+    //TODO: for sasl proxy.
+    // github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
     private boolean invalidOriginalPrincipal(String originalPrincipal) {
-        return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled() && proxyRoles.contains(authRole)
-                && (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal)));
+        return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()
+            && !isSaslAuthenticationMethod()
+            && proxyRoles.contains(authRole) && (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal)));
     }
 
     // ////
@@ -1498,6 +1502,10 @@ public class ServerCnx extends PulsarHandler {
         }
     }
 
+    private boolean isSaslAuthenticationMethod(){
+        return authMethod.equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index dc631d6..824be7a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -151,12 +151,12 @@ public abstract class PulsarWebResource {
             // Request has come from a proxy
             if (StringUtils.isBlank(originalPrincipal)) {
                 log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
-                throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");               
+                throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
             }
             if (proxyRoles.contains(originalPrincipal)) {
                 log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
-                throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be a proxy role");           
-            } 
+                throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+            }
         }
     }
 
@@ -167,7 +167,10 @@ public abstract class PulsarWebResource {
      *             if not authorized
      */
     protected void validateSuperUserAccess() {
-        if (config().isAuthenticationEnabled()) {
+        // TODO: isSaslAuthentication used to bypass web resource check.
+        //  will remove it after implementation the support.
+        //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+        if (config().isAuthenticationEnabled() && !config().isSaslAuthentication()) {
             String appId = clientAppId();
             if(log.isDebugEnabled()) {
                 log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),
@@ -243,7 +246,7 @@ public abstract class PulsarWebResource {
             throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
         }
 
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
+        if (pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication() && pulsar.getConfiguration().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId)) {
                 throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 3ae55bc..ef5c3c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -133,7 +133,10 @@ public class WebService implements AutoCloseable {
             });
         }
 
-        if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
+        // TODO: isSaslAuthentication used to bypass web resource check.
+        //  will remove it after implementation the support.
+        //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+        if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication()) {
             FilterHolder filter = new FilterHolder(new AuthenticationFilter(
                                                            pulsar.getBrokerService().getAuthenticationService()));
             context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index ecf3d3f..9a0df5d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -18,16 +18,17 @@
  */
 package org.apache.pulsar.client.api;
 
-import java.io.IOException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.Serializable;
 import java.security.PrivateKey;
 import java.security.cert.Certificate;
 import java.util.Map;
 import java.util.Set;
+
 import javax.naming.AuthenticationException;
-import org.apache.pulsar.common.api.AuthData;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import org.apache.pulsar.common.api.AuthData;
 
 /**
  * Interface for accessing data which are used in variety of authentication schemes on client side
@@ -119,7 +120,7 @@ public interface AuthenticationDataProvider extends Serializable {
      *
      * Mainly used for mutual authentication like sasl.
      */
-    default AuthData authenticate(AuthData data) throws IOException, AuthenticationException {
+    default AuthData authenticate(AuthData data) throws AuthenticationException {
         byte[] bytes = (hasDataFromCommand() ? this.getCommandData() : "").getBytes(UTF_8);
         return AuthData.of(bytes);
     }
diff --git a/pulsar-client-auth-sasl/pom.xml b/pulsar-client-auth-sasl/pom.xml
new file mode 100644
index 0000000..27288ac
--- /dev/null
+++ b/pulsar-client-auth-sasl/pom.xml
@@ -0,0 +1,45 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-auth-sasl</artifactId>
+  <packaging>jar</packaging>
+  <description>SASL authentication plugin for java client</description>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
new file mode 100644
index 0000000..7b98be3
--- /dev/null
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -0,0 +1,138 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.AuthenticationUtil;
+import org.apache.pulsar.client.impl.auth.PulsarSaslClient.ClientCallbackHandler;
+import org.apache.pulsar.common.sasl.JAASCredentialsContainer;
+import org.apache.pulsar.common.sasl.SaslConstants;
+
+/**
+ * Authentication provider for SASL based authentication.
+ *
+ * SASL need config files through JVM parameter:
+ *   a jaas.conf, which is set by `-Djava.security.auth.login.config=/dir/jaas.conf`
+ *   for Kerberos a krb5.conf, which is set by `-Djava.security.krb5.conf=/dir/krb5.conf`
+ */
+@Slf4j
+public class AuthenticationSasl implements Authentication, EncodedAuthenticationParameterSupport {
+    private static final long serialVersionUID = 1L;
+    // this is a static object that shares amongst client.
+    static private JAASCredentialsContainer jaasCredentialsContainer;
+    static private volatile boolean initializedJAAS = false;
+
+    private Map<String, String> configuration;
+    private String loginContextName;
+
+    public AuthenticationSasl() {
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return SaslConstants.AUTH_METHOD_NAME;
+    }
+
+    @Override
+    public AuthenticationDataProvider getAuthData(String brokerHostName) throws PulsarClientException {
+        // reuse this to return a DataProvider which contains a SASL client
+        try {
+            PulsarSaslClient saslClient = new PulsarSaslClient(brokerHostName, jaasCredentialsContainer.getSubject());
+            return new SaslAuthenticationDataProvider(saslClient);
+        } catch (Throwable t) {
+            log.error("Failed create sasl client: {}", t);
+            throw new PulsarClientException(t);
+        }
+    }
+
+    @Override
+    public void configure(String encodedAuthParamString) {
+        if (isBlank(encodedAuthParamString)) {
+            log.info("authParams for SASL is be empty, will use default JAAS client section name: {}",
+                SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+        }
+
+        try {
+            setAuthParams(AuthenticationUtil.configureFromJsonString(encodedAuthParamString));
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Failed to parse SASL authParams", e);
+        }
+    }
+
+    @Override
+    @Deprecated
+    public void configure(Map<String, String> authParams) {
+        try {
+            setAuthParams(authParams);
+        }  catch (IOException e) {
+            throw new IllegalArgumentException("Failed to parse SASL authParams", e);
+        }
+    }
+
+    // use passed in parameter to config ange get jaasCredentialsContainer.
+    private void setAuthParams(Map<String, String> authParams) throws PulsarClientException {
+        this.configuration = authParams;
+
+        // read section from config files of kerberos
+        this.loginContextName = authParams
+            .getOrDefault(SaslConstants.JAAS_CLIENT_SECTION_NAME, SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+
+        // init the static jaasCredentialsContainer that shares amongst client.
+        if (!initializedJAAS) {
+            synchronized (this) {
+                if (jaasCredentialsContainer == null) {
+                    log.info("JAAS loginContext is: {}." , loginContextName);
+                    try {
+                        jaasCredentialsContainer = new JAASCredentialsContainer(
+                            loginContextName,
+                            new ClientCallbackHandler(),
+                            configuration);
+                        initializedJAAS = true;
+                    } catch (LoginException e) {
+                        log.error("JAAS login in client failed: {}" , e);
+                        throw new PulsarClientException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void start() throws PulsarClientException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+}
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
new file mode 100644
index 0000000..23399a1
--- /dev/null
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.naming.AuthenticationException;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.sasl.KerberosName;
+import org.apache.pulsar.common.sasl.SaslConstants;
+
+/**
+ * A SASL Client object.
+ * This is added for support Kerberos authentication.
+ */
+@Slf4j
+public class PulsarSaslClient {
+    private final SaslClient saslClient;
+    private final Subject clientSubject;
+
+    public PulsarSaslClient(String serverHostname, Subject subject) throws SaslException {
+        checkArgument(subject != null, "Cannot create SASL client with NULL JAAS subject");
+        checkArgument(!Strings.isNullOrEmpty(serverHostname), "Cannot create SASL client with NUll server name");
+
+        String serverPrincipal = SaslConstants.SASL_PULSAR_PROTOCOL + "/" + serverHostname;
+        this.clientSubject = subject;
+        if (clientSubject.getPrincipals().isEmpty()) {
+            throw new SaslException("Cannot create SASL client with empty JAAS subject principal");
+        }
+        // GSSAPI/Kerberos
+        final Object[] principals = clientSubject.getPrincipals().toArray();
+        final Principal clientPrincipal = (Principal) principals[0];
+
+        final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
+        KerberosName serviceKerberosName = new KerberosName(serverPrincipal + "@" + clientKerberosName.getRealm());
+        final String serviceName = serviceKerberosName.getServiceName();
+        final String serviceHostname = serviceKerberosName.getHostName();
+        final String clientPrincipalName = clientKerberosName.toString();
+        log.info("Using JAAS/SASL/GSSAPI auth to connect to server Principal {},",
+            serverPrincipal);
+
+        try {
+            this.saslClient = Subject.doAs(clientSubject, new PrivilegedExceptionAction<SaslClient>() {
+                @Override
+                public SaslClient run() throws SaslException {
+                    String[] mechs = {"GSSAPI"};
+                    return Sasl.createSaslClient(mechs, clientPrincipalName, serviceName, serviceHostname, null,
+                        new ClientCallbackHandler());
+                }
+            });
+        } catch (PrivilegedActionException err) {
+            log.error("GSSAPI client error", err.getCause());
+            throw new SaslException("error while booting GSSAPI client", err.getCause());
+        }
+
+        if (saslClient == null) {
+            throw new SaslException("Cannot create JVM SASL Client");
+        }
+
+    }
+
+    public AuthData evaluateChallenge(final AuthData saslToken) throws AuthenticationException {
+        if (saslToken == null) {
+            throw new AuthenticationException("saslToken is null");
+        }
+        try {
+            if (clientSubject != null) {
+                final byte[] retval = Subject.doAs(clientSubject, new PrivilegedExceptionAction<byte[]>() {
+                    @Override
+                    public byte[] run() throws SaslException {
+                        return saslClient.evaluateChallenge(saslToken.getBytes());
+                    }
+                });
+                return AuthData.of(retval);
+
+            } else {
+                return AuthData.of(saslClient.evaluateChallenge(saslToken.getBytes()));
+            }
+        } catch (Exception e) {
+            log.error("SASL error", e.getCause());
+            throw new AuthenticationException("SASL/JAAS error" + e.getCause());
+        }
+    }
+
+    public boolean hasInitialResponse() {
+        return saslClient.hasInitialResponse();
+    }
+
+    static class ClientCallbackHandler implements CallbackHandler {
+        @Override
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof AuthorizeCallback) {
+                    handleAuthorizeCallback((AuthorizeCallback) callback);
+                } else {
+                    throw new UnsupportedCallbackException(callback, "Unrecognized SASL GSSAPI Client Callback.");
+                }
+            }
+        }
+
+        private void handleAuthorizeCallback(AuthorizeCallback ac) {
+            String authid = ac.getAuthenticationID();
+            String authzid = ac.getAuthorizationID();
+            if (authid.equals(authzid)) {
+                ac.setAuthorized(true);
+            } else {
+                ac.setAuthorized(false);
+            }
+            if (ac.isAuthorized()) {
+                ac.setAuthorizedID(authzid);
+            }
+            log.info("Successfully authenticated. authenticationID: {};  authorizationID: {}.",
+                authid, authzid);
+        }
+    }
+
+
+    public boolean isComplete() {
+        return saslClient.isComplete();
+    }
+
+}
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
new file mode 100644
index 0000000..6f38bd7
--- /dev/null
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import java.util.Arrays;
+
+import javax.naming.AuthenticationException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.common.api.AuthData;
+
+@Slf4j
+public class SaslAuthenticationDataProvider implements AuthenticationDataProvider {
+    private static final long serialVersionUID = 1L;
+
+    private PulsarSaslClient pulsarSaslClient;
+
+    public SaslAuthenticationDataProvider(PulsarSaslClient pulsarSaslClient) {
+        this.pulsarSaslClient = pulsarSaslClient;
+    }
+
+    @Override
+    public boolean hasDataFromCommand() {
+        return true;
+    }
+
+    // create token that evaluated by client, and will send to server.
+    @Override
+    public AuthData authenticate(AuthData commandData) throws AuthenticationException {
+        // init
+        if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA)) {
+            if (pulsarSaslClient.hasInitialResponse()) {
+                return pulsarSaslClient.evaluateChallenge(AuthData.of(new byte[0]));
+            }
+            return AuthData.of(new byte[0]);
+        }
+
+        return pulsarSaslClient.evaluateChallenge(commandData);
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
new file mode 100644
index 0000000..345bae8
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * JAAS Credentials Container.
+ * This is added for support Kerberos authentication.
+ */
+@Slf4j
+@Getter
+public class JAASCredentialsContainer implements Closeable {
+    private Subject subject;
+    private String principal;
+    private boolean isKrbTicket;
+    private boolean isUsingTicketCache;
+    private TGTRefreshThread ticketRefreshThread;
+
+    public CallbackHandler callbackHandler;
+    private String loginContextName;
+    private LoginContext loginContext;
+    private Map<String, String> configuration;
+
+    public JAASCredentialsContainer(String loginContextName,
+                                    CallbackHandler callbackHandler,
+                                    Map<String, String> configuration)
+        throws LoginException {
+        this.configuration = configuration;
+        this.callbackHandler = callbackHandler;
+        this.loginContextName = loginContextName;
+        AppConfigurationEntry[] entries = Configuration.getConfiguration()
+            .getAppConfigurationEntry(loginContextName);
+        if (entries == null) {
+            final String errorMessage = "loginContext name (JAAS file section header) was null. " +
+                "Please check your java.security.login.auth.config (=" +
+                System.getProperty("java.security.login.auth.config") +
+                ") for section header: " + this.loginContextName;
+            log.error("No JAAS Configuration section header found for Client: {}", errorMessage);
+            throw new LoginException(errorMessage);
+        }
+        LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
+        loginContext.login();
+        log.info("successfully logged in.");
+
+        this.loginContext = loginContext;
+        this.subject = loginContext.getSubject();
+        this.isKrbTicket = !this.subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+        if (isKrbTicket) {
+            this.isUsingTicketCache = SaslConstants.isUsingTicketCache(loginContextName);
+            this.principal = SaslConstants.getPrincipal(loginContextName);
+            this.ticketRefreshThread = new TGTRefreshThread(this);
+        } else {
+            throw new LoginException("Kerberos authentication without KerberosTicket provided!");
+        }
+
+        ticketRefreshThread.start();
+    }
+
+    void setLoginContext(LoginContext login) {
+        this.loginContext = login;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (ticketRefreshThread != null) {
+            ticketRefreshThread.interrupt();
+            try {
+                ticketRefreshThread.join(10000);
+            } catch (InterruptedException exit) {
+                Thread.currentThread().interrupt();
+                if (log.isDebugEnabled()) {
+                    log.debug("interrupted while waiting for TGT refresh thread to stop", exit);
+                }
+            }
+        }
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java
new file mode 100644
index 0000000..5f36fde
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class implements parsing and handling of Kerberos principal names. In
+ * particular, it splits them apart and translates them down into local
+ * operating system names.
+ *
+ * Copied from Apache ZooKeeper KerberosName.
+ */
+public class KerberosName {
+    /** The first component of the name */
+    private final String serviceName;
+    /** The second component of the name. It may be null. */
+    private final String hostName;
+    /** The realm of the name. */
+    private final String realm;
+
+    /**
+     * A pattern that matches a Kerberos name with at most 2 components.
+     */
+    private static final Pattern nameParser =
+        Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
+
+    /**
+     * A pattern that matches a string with out '$' and then a single
+     * parameter with $n.
+     */
+    private static Pattern parameterPattern =
+        Pattern.compile("([^$]*)(\\$(\\d*))?");
+
+    /**
+     * A pattern for parsing a auth_to_local rule.
+     */
+    private static final Pattern ruleParser =
+        Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
+            "(s/([^/]*)/([^/]*)/(g)?)?))");
+
+    /**
+     * A pattern that recognizes simple/non-simple names.
+     */
+    private static final Pattern nonSimplePattern = Pattern.compile("[/@]");
+
+    /**
+     * The list of translation rules.
+     */
+    private static List<Rule> rules;
+
+    private static String defaultRealm;
+
+    public static String getDefaultRealm2()
+        throws ClassNotFoundException, NoSuchMethodException,
+        IllegalArgumentException, IllegalAccessException,
+        InvocationTargetException {
+        Object kerbConf;
+        Class<?> classRef;
+        Method getInstanceMethod;
+        Method getDefaultRealmMethod;
+        if (System.getProperty("java.vendor").contains("IBM")) {
+            classRef = Class.forName("com.ibm.security.krb5.internal.Config");
+        } else {
+            classRef = Class.forName("sun.security.krb5.Config");
+        }
+        getInstanceMethod = classRef.getMethod("getInstance", new Class<?>[0]);
+        kerbConf = getInstanceMethod.invoke(classRef, new Object[0]);
+        getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm",
+            new Class<?>[0]);
+        return (String)getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
+    }
+
+    static {
+        try {
+            defaultRealm = getDefaultRealm2();
+        } catch (Exception ke) {
+            if ((System.getProperty("zookeeper.requireKerberosConfig") != null) &&
+                (System.getProperty("zookeeper.requireKerberosConfig").equals("true"))) {
+                throw new IllegalArgumentException("Can't get Kerberos configuration",ke);
+            }
+            else
+                defaultRealm="";
+        }
+        try {
+            // setConfiguration() will work even if the above try() fails due
+            // to a missing Kerberos configuration (unless zookeeper.requireKerberosConfig
+            // is set to true, which would not allow execution to reach here due to the
+            // throwing of an IllegalArgumentException above).
+            setConfiguration();
+        }
+        catch (IOException e) {
+            throw new IllegalArgumentException("Could not configure Kerberos principal name mapping.");
+        }
+    }
+
+    /**
+     * Create a name from the full Kerberos principal name.
+     * @param name
+     */
+    public KerberosName(String name) {
+        Matcher match = nameParser.matcher(name);
+        if (!match.matches()) {
+            if (name.contains("@")) {
+                throw new IllegalArgumentException("Malformed Kerberos name: " + name);
+            } else {
+                serviceName = name;
+                hostName = null;
+                realm = null;
+            }
+        } else {
+            serviceName = match.group(1);
+            hostName = match.group(3);
+            realm = match.group(4);
+        }
+    }
+
+    /**
+     * Get the configured default realm.
+     * @return the default realm from the krb5.conf
+     */
+    public String getDefaultRealm() {
+        return defaultRealm;
+    }
+
+    /**
+     * Put the name back together from the parts.
+     */
+    @Override
+    public String toString() {
+        StringBuilder result = new StringBuilder();
+        result.append(serviceName);
+        if (hostName != null) {
+            result.append('/');
+            result.append(hostName);
+        }
+        if (realm != null) {
+            result.append('@');
+            result.append(realm);
+        }
+        return result.toString();
+    }
+
+    /**
+     * Get the first component of the name.
+     * @return the first section of the Kerberos principal name
+     */
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    /**
+     * Get the second component of the name.
+     * @return the second section of the Kerberos principal name, and may be null
+     */
+    public String getHostName() {
+        return hostName;
+    }
+
+    /**
+     * Get the realm of the name.
+     * @return the realm of the name, may be null
+     */
+    public String getRealm() {
+        return realm;
+    }
+
+    /**
+     * An encoding of a rule for translating kerberos names.
+     */
+    private static class Rule {
+        private final boolean isDefault;
+        private final int numOfComponents;
+        private final String format;
+        private final Pattern match;
+        private final Pattern fromPattern;
+        private final String toPattern;
+        private final boolean repeat;
+
+        Rule() {
+            isDefault = true;
+            numOfComponents = 0;
+            format = null;
+            match = null;
+            fromPattern = null;
+            toPattern = null;
+            repeat = false;
+        }
+
+        Rule(int numOfComponents, String format, String match, String fromPattern,
+             String toPattern, boolean repeat) {
+            isDefault = false;
+            this.numOfComponents = numOfComponents;
+            this.format = format;
+            this.match = match == null ? null : Pattern.compile(match);
+            this.fromPattern =
+                fromPattern == null ? null : Pattern.compile(fromPattern);
+            this.toPattern = toPattern;
+            this.repeat = repeat;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buf = new StringBuilder();
+            if (isDefault) {
+                buf.append("DEFAULT");
+            } else {
+                buf.append("RULE:[");
+                buf.append(numOfComponents);
+                buf.append(':');
+                buf.append(format);
+                buf.append(']');
+                if (match != null) {
+                    buf.append('(');
+                    buf.append(match);
+                    buf.append(')');
+                }
+                if (fromPattern != null) {
+                    buf.append("s/");
+                    buf.append(fromPattern);
+                    buf.append('/');
+                    buf.append(toPattern);
+                    buf.append('/');
+                    if (repeat) {
+                        buf.append('g');
+                    }
+                }
+            }
+            return buf.toString();
+        }
+
+        /**
+         * Replace the numbered parameters of the form $n where n is from 1 to
+         * the length of params. Normal text is copied directly and $n is replaced
+         * by the corresponding parameter.
+         * @param format the string to replace parameters again
+         * @param params the list of parameters
+         * @return the generated string with the parameter references replaced.
+         * @throws BadFormatString
+         */
+        static String replaceParameters(String format,
+                                        String[] params) throws BadFormatString {
+            Matcher match = parameterPattern.matcher(format);
+            int start = 0;
+            StringBuilder result = new StringBuilder();
+            while (start < format.length() && match.find(start)) {
+                result.append(match.group(1));
+                String paramNum = match.group(3);
+                if (paramNum != null) {
+                    try {
+                        int num = Integer.parseInt(paramNum);
+                        if (num < 0 || num > params.length) {
+                            throw new BadFormatString("index " + num + " from " + format +
+                                " is outside of the valid range 0 to " +
+                                (params.length - 1));
+                        }
+                        result.append(params[num]);
+                    } catch (NumberFormatException nfe) {
+                        throw new BadFormatString("bad format in username mapping in " +
+                            paramNum, nfe);
+                    }
+
+                }
+                start = match.end();
+            }
+            return result.toString();
+        }
+
+        /**
+         * Replace the matches of the from pattern in the base string with the value
+         * of the to string.
+         * @param base the string to transform
+         * @param from the pattern to look for in the base string
+         * @param to the string to replace matches of the pattern with
+         * @param repeat whether the substitution should be repeated
+         * @return
+         */
+        static String replaceSubstitution(String base, Pattern from, String to,
+                                          boolean repeat) {
+            Matcher match = from.matcher(base);
+            if (repeat) {
+                return match.replaceAll(to);
+            } else {
+                return match.replaceFirst(to);
+            }
+        }
+
+        /**
+         * Try to apply this rule to the given name represented as a parameter
+         * array.
+         * @param params first element is the realm, second and later elements are
+         *        are the components of the name "a/b@FOO" -> {"FOO", "a", "b"}
+         * @return the short name if this rule applies or null
+         * @throws IOException throws if something is wrong with the rules
+         */
+        String apply(String[] params) throws IOException {
+            String result = null;
+            if (isDefault) {
+                if (defaultRealm.equals(params[0])) {
+                    result = params[1];
+                }
+            } else if (params.length - 1 == numOfComponents) {
+                String base = replaceParameters(format, params);
+                if (match == null || match.matcher(base).matches()) {
+                    if (fromPattern == null) {
+                        result = base;
+                    } else {
+                        result = replaceSubstitution(base, fromPattern, toPattern,  repeat);
+                    }
+                }
+            }
+            if (result != null && nonSimplePattern.matcher(result).find()) {
+                throw new NoMatchingRule("Non-simple name " + result +
+                    " after auth_to_local rule " + this);
+            }
+            return result;
+        }
+    }
+
+    static List<Rule> parseRules(String rules) {
+        List<Rule> result = new ArrayList<Rule>();
+        String remaining = rules.trim();
+        while (remaining.length() > 0) {
+            Matcher matcher = ruleParser.matcher(remaining);
+            if (!matcher.lookingAt()) {
+                throw new IllegalArgumentException("Invalid rule: " + remaining);
+            }
+            if (matcher.group(2) != null) {
+                result.add(new Rule());
+            } else {
+                result.add(new Rule(Integer.parseInt(matcher.group(4)),
+                    matcher.group(5),
+                    matcher.group(7),
+                    matcher.group(9),
+                    matcher.group(10),
+                    "g".equals(matcher.group(11))));
+            }
+            remaining = remaining.substring(matcher.end());
+        }
+        return result;
+    }
+
+    /**
+     * Set the static configuration to get the rules.
+     * @param conf the new configuration
+     * @throws IOException
+     */
+    public static void setConfiguration() throws IOException {
+        String ruleString = System.getProperty("zookeeper.security.auth_to_local", "DEFAULT");
+        rules = parseRules(ruleString);
+    }
+
+    @SuppressWarnings("serial")
+    public static class BadFormatString extends IOException {
+        BadFormatString(String msg) {
+            super(msg);
+        }
+        BadFormatString(String msg, Throwable err) {
+            super(msg, err);
+        }
+    }
+
+    @SuppressWarnings("serial")
+    public static class NoMatchingRule extends IOException {
+        NoMatchingRule(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
+     * Get the translation of the principal name into an operating system
+     * user name.
+     * @return the short name
+     * @throws IOException
+     */
+    public String getShortName() throws IOException {
+        String[] params;
+        if (hostName == null) {
+            // if it is already simple, just return it
+            if (realm == null) {
+                return serviceName;
+            }
+            params = new String[]{realm, serviceName};
+        } else {
+            params = new String[]{realm, serviceName, hostName};
+        }
+        for(Rule r: rules) {
+            String result = r.apply(params);
+            if (result != null) {
+                return result;
+            }
+        }
+        throw new NoMatchingRule("No rules applied to " + toString());
+    }
+
+    static void printRules() throws IOException {
+        int i = 0;
+        for(Rule r: rules) {
+            System.out.println(++i + " " + r);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        for(String arg: args) {
+            KerberosName name = new KerberosName(arg);
+            System.out.println("Name: " + name + " to " + name.getShortName());
+        }
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
new file mode 100644
index 0000000..749d411
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
+/**
+ * SASL Constants.
+ */
+public class SaslConstants {
+
+    public static final String AUTH_METHOD_NAME = "sasl";
+
+    // service broker Principal
+    public static final String JAAS_BROKER_SECTION_NAME = "saslJaasBrokerSectionName";
+    public static final String JAAS_DEFAULT_BROKER_SECTION_NAME = "PulsarBroker";
+
+    //TODO: for sasl proxy.
+    // github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
+    public static final String JAAS_PROXY_SECTION_NAME = "saslJaasProxySectionName";
+    public static final String JAAS_DEFAULT_PROXY_SECTION_NAME = "PulsarProxy";
+
+    // Client principal
+    public static final String JAAS_CLIENT_SECTION_NAME = "saslJaasClientSectionName";
+    public static final String JAAS_DEFAULT_CLIENT_SECTION_NAME = "PulsarClient";
+
+    /**
+     * This is a regexp which limits the range of possible ids which can connect to the Broker using SASL.
+     * By default only clients whose id contains 'pulsar' are allowed to connect.
+     */
+    public static final String JAAS_CLIENT_ALLOWED_IDS = "saslJaasClientAllowedIds";
+    public static final String JAAS_CLIENT_ALLOWED_IDS_DEFAULT = ".*pulsar.*";
+
+    public static final String KINIT_COMMAND_DEFAULT = "/usr/bin/kinit";
+
+    public static final String KINIT_COMMAND = "kerberos.kinit";
+
+    // The non-null string name of the protocol for which the authentication is being performed (e.g., "ldap").
+    public static final String SASL_PULSAR_PROTOCOL = "broker";
+    // The non-null fully-qualified host name of the server to authenticate to.
+    public static final String SASL_PULSAR_REALM = "EXAMPLE.COM";
+
+    // Stand for the start of mutual auth between Client and Broker
+    public static final String INIT_PROVIDER_DATA = "isInit";
+
+    public static boolean isUsingTicketCache(String configurationEntry) {
+        AppConfigurationEntry[] entries = Configuration.getConfiguration()
+            .getAppConfigurationEntry(configurationEntry);
+        if (entries == null) {
+            return false;
+        }
+        for (AppConfigurationEntry entry : entries) {
+            // there will only be a single entry, so this for() loop will only be iterated through once.
+            if (entry.getOptions().get("useTicketCache") != null) {
+                String val = (String) entry.getOptions().get("useTicketCache");
+                if (val.equals("true")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public static String getPrincipal(String configurationEntry) {
+
+        AppConfigurationEntry[] entries = Configuration.getConfiguration()
+            .getAppConfigurationEntry(configurationEntry);
+        if (entries == null) {
+            return null;
+        }
+        for (AppConfigurationEntry entry : entries) {
+            if (entry.getOptions().get("principal") != null) {
+                return (String) entry.getOptions().get("principal");
+            }
+        }
+        return null;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
new file mode 100644
index 0000000..424b02b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.Set;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * TGT Refresh Thread. Copied from Apache ZooKeeper TGT refresh logic.
+ */
+@Slf4j
+public class TGTRefreshThread extends Thread {
+
+    private static final Random rng = new Random();
+
+    private long lastLogin;
+    private final JAASCredentialsContainer container;
+
+    public long getLastLogin() {
+        return lastLogin;
+    }
+
+    public void setLastLogin(long lastLogin) {
+        this.lastLogin = lastLogin;
+    }
+
+    public TGTRefreshThread(JAASCredentialsContainer container) {
+        this.container = container;
+        // Initialize 'lastLogin' to do a login at first time
+        this.lastLogin = System.currentTimeMillis() - MIN_TIME_BEFORE_RELOGIN;
+        setDaemon(true);
+        setName("pulsar-tgt-refresh-thread");
+    } // Initialize 'lastLogin' to do a login at first time
+
+    private synchronized KerberosTicket getTGT() {
+        Set<KerberosTicket> tickets = container.getSubject().getPrivateCredentials(KerberosTicket.class);
+        for (KerberosTicket ticket : tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+                log.info("Client principal is \"" + ticket.getClient().getName() + "\".");
+                log.info("Server principal is \"" + ticket.getServer().getName() + "\".");
+                return ticket;
+            }
+        }
+        return null;
+    }
+    // LoginThread will sleep until 80% of time from last refresh to
+    // ticket's expiry has been reached, at which time it will wake
+    // and try to renew the ticket.
+    private static final float TICKET_RENEW_WINDOW = 0.80f;
+    /**
+     * Percentage of random jitter added to the renewal time.
+     */
+    private static final float TICKET_RENEW_JITTER = 0.05f;
+    // Regardless of TICKET_RENEW_WINDOW setting above and the ticket expiry time,
+    // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute).
+    // Change the '1' to e.g. 5, to change this to 5 minutes.
+    private static final long MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
+
+    private long getRefreshTime(KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long expires = tgt.getEndTime().getTime();
+        log.info("TGT valid starting at:        {}", tgt.getStartTime().toString());
+        log.info("TGT expires:                  {}", tgt.getEndTime().toString());
+        long proposedRefresh = start
+            + (long) ((expires - start) * (TICKET_RENEW_WINDOW + (TICKET_RENEW_JITTER * rng.nextDouble())));
+        if (proposedRefresh > expires) {
+            // proposedRefresh is too far in the future: it's after ticket expires: simply return now.
+            return System.currentTimeMillis();
+        } else {
+            return proposedRefresh;
+        }
+    }
+
+    @Override
+    public void run() {
+        log.info("TGT refresh thread started.");
+        while (true) {
+            // renewal thread's main loop. if it exits from here, thread will exit.
+            KerberosTicket tgt = getTGT();
+            long now = System.currentTimeMillis();
+            long nextRefresh;
+            Date nextRefreshDate;
+            if (tgt == null) {
+                nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
+                nextRefreshDate = new Date(nextRefresh);
+                log.warn("No TGT found: will try again at {}", nextRefreshDate);
+            } else {
+                nextRefresh = getRefreshTime(tgt);
+                long expiry = tgt.getEndTime().getTime();
+                Date expiryDate = new Date(expiry);
+                if ((container.isUsingTicketCache()) && (tgt.getEndTime().equals(tgt.getRenewTill()))) {
+                    Object[] logPayload = {expiryDate, container.getPrincipal(), container.getPrincipal()};
+                    log.error("The TGT cannot be renewed beyond the next expiry date: {}."
+                        + "This process will not be able to authenticate new SASL connections after that "
+                        + "time (for example, it will not be authenticate a new connection with a Broker "
+                        + ").  Ask your system administrator to either increase the "
+                        + "'renew until' time by doing : 'modprinc -maxrenewlife {}' within "
+                        + "kadmin, or instead, to generate a keytab for {}. Because the TGT's "
+                        + "expiry cannot be further extended by refreshing, exiting refresh thread now.", logPayload);
+                    return;
+                }
+                // determine how long to sleep from looking at ticket's expiry.
+                // We should not allow the ticket to expire, but we should take into consideration
+                // MIN_TIME_BEFORE_RELOGIN. Will not sleep less than MIN_TIME_BEFORE_RELOGIN, unless doing so
+                // would cause ticket expiration.
+                if ((nextRefresh > expiry) || ((now + MIN_TIME_BEFORE_RELOGIN) > expiry)) {
+                    // expiry is before next scheduled refresh).
+                    nextRefresh = now;
+                } else {
+                    if (nextRefresh < (now + MIN_TIME_BEFORE_RELOGIN)) {
+                        // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+                        Date until = new Date(nextRefresh);
+                        Date newuntil = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+                        Object[] logPayload = {until, newuntil, MIN_TIME_BEFORE_RELOGIN / 1000};
+                        log.warn("TGT refresh thread time adjusted from : {} to : {} since "
+                            + "the former is sooner than the minimum refresh interval ("
+                            + "{} seconds) from now.", logPayload);
+                    }
+                    nextRefresh = Math.max(nextRefresh, now + MIN_TIME_BEFORE_RELOGIN);
+                }
+                nextRefreshDate = new Date(nextRefresh);
+                if (nextRefresh > expiry) {
+                    Object[] logPayload = {nextRefreshDate, expiryDate};
+                    log.error("next refresh: {} is later than expiry {}." + " This may indicate a clock skew problem."
+                            + "Check that this host and the KDC's " + "hosts' clocks are in sync. Exiting refresh thread.",
+                        logPayload);
+                    return;
+                }
+            }
+            if (now == nextRefresh) {
+                log.info("refreshing now because expiry is before next scheduled refresh time.");
+            } else if (now < nextRefresh) {
+                Date until = new Date(nextRefresh);
+                log.info("TGT refresh sleeping until: {}", until.toString());
+                try {
+                    Thread.sleep(nextRefresh - now);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    log.warn("TGT renewal thread has been interrupted and will exit.");
+                    break;
+                }
+            } else {
+                log.error("nextRefresh:{} is in the past: exiting refresh thread. Check"
+                    + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+                    + " Manual intervention will be required for this client to successfully authenticate."
+                    + " Exiting refresh thread.", nextRefreshDate);
+                break;
+            }
+            if (container.isUsingTicketCache()) {
+                String cmd = container.getConfiguration().getOrDefault(SaslConstants.KINIT_COMMAND,
+                    SaslConstants.KINIT_COMMAND_DEFAULT);
+                String kinitArgs = "-R";
+                int retry = 1;
+                while (retry >= 0) {
+                    try {
+                        log.info("running ticket cache refresh command: {} {}", cmd, kinitArgs);
+
+                        ProcessBuilder processBuilder = new ProcessBuilder();
+                        processBuilder.command("bash", "-c", cmd, kinitArgs);
+                        break;
+                    } catch (Exception e) {
+                        if (retry > 0) {
+                            --retry;
+                            // sleep for 10 seconds
+                            try {
+                                Thread.sleep(10 * 1000);
+                            } catch (InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                log.error("Interrupted while renewing TGT, exiting Login thread");
+                                return;
+                            }
+                        } else {
+                            Object[] logPayload = {cmd, kinitArgs, e.toString(), e};
+                            log.warn("Could not renew TGT due to problem running shell command: '{}"
+                                + " {}'; exception was:{}. Exiting refresh thread.", logPayload);
+                            return;
+                        }
+                    }
+                }
+            }
+            try {
+                int retry = 1;
+                while (retry >= 0) {
+                    try {
+                        reLogin();
+                        break;
+                    } catch (LoginException le) {
+                        if (retry > 0) {
+                            --retry;
+                            // sleep for 10 seconds.
+                            try {
+                                Thread.sleep(10 * 1000);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                log.error("Interrupted during login retry after LoginException:", le);
+                                throw le;
+                            }
+                        } else {
+                            log.error("Could not refresh TGT for principal: {}.", container.getPrincipal(), le);
+                        }
+                    }
+                }
+            } catch (LoginException le) {
+                log.error("Failed to refresh TGT: refresh thread exiting now.", le);
+                break;
+            }
+        }
+    }
+
+    /**
+     * Re-login a principal. This method assumes that {@link #login(String)} has happened already.
+     * c.f. HADOOP-6559
+     * @throws LoginException on a failure
+     */
+    private synchronized void reLogin() throws LoginException {
+        LoginContext login = container.getLoginContext();
+        if (login == null) {
+            throw new LoginException("login must be done first");
+        }
+        if (!hasSufficientTimeElapsed()) {
+            return;
+        }
+        log.info("Initiating logout for {}", container.getPrincipal());
+        synchronized (this) {
+            //clear up the kerberos state. But the tokens are not cleared! As per
+            //the Java kerberos login module code, only the kerberos credentials
+            //are cleared
+            login.logout();
+            //login and also update the subject field of this instance to
+            //have the new credentials (pass it to the LoginContext constructor)
+            login = new LoginContext(container.getLoginContextName(), container.getSubject());
+            log.info("Initiating re-login for {}", container.getPrincipal());
+            login.login();
+            container.setLoginContext(login);
+        }
+    }
+
+    private boolean hasSufficientTimeElapsed() {
+        long now = System.currentTimeMillis();
+        if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN) {
+            log.warn("Not attempting to re-login since the last re-login was "
+                + "attempted less than {} seconds before.", MIN_TIME_BEFORE_RELOGIN / 1000);
+            return false;
+        }
+        // register most recent relogin attempt
+        setLastLogin(now);
+        return true;
+    }
+
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index b984f81..edc0c1e 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -245,7 +245,10 @@ public class WebSocketService implements Closeable {
     public boolean isAuthenticationEnabled() {
         if (this.config == null)
             return false;
-        return this.config.isAuthenticationEnabled();
+        // TODO: isSaslAuthentication used to bypass web resource check.
+        //  will remove it after implementation the support.
+        //  github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+        return this.config.isAuthenticationEnabled() && !this.config.isSaslAuthentication();
     }
 
     public boolean isAuthorizationEnabled() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
index 10e3664..e6eb158 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
@@ -96,7 +96,7 @@ public class WebSocketWebResource {
      *             if not authorized
      */
     protected void validateSuperUserAccess() {
-        if (service().getConfig().isAuthenticationEnabled()) {
+        if (service().getConfig().isAuthenticationEnabled() && !service().getConfig().isSaslAuthentication()) {
             String appId = clientAppId();
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),