You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/05/26 07:18:47 UTC

[kafka] branch trunk updated: KAFKA-6562: OAuth Authentication via SASL/OAUTHBEARER (KIP-255) (#4994)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c5d7e0  KAFKA-6562: OAuth Authentication via SASL/OAUTHBEARER (KIP-255) (#4994)
8c5d7e0 is described below

commit 8c5d7e0408a62aa5f414e8b707050bf8e313a57e
Author: Ron Dagostino <rn...@gmail.com>
AuthorDate: Sat May 26 03:18:41 2018 -0400

    KAFKA-6562: OAuth Authentication via SASL/OAUTHBEARER (KIP-255) (#4994)
    
    This KIP adds the following functionality related to SASL/OAUTHBEARER:
    
    1) Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server based on the declaration of a custom login CallbackHandler implementation and have that access token transparently and automatically transmitted to a broker for authentication.
    
    2) Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on the declaration of a custom SASL Server CallbackHandler implementation.
    
    3) Provide implementations of the above retrieval and validation features based on an unsecured JSON Web Token that function out-of-the-box with minimal configuration required (i.e. implementations of the two types of callback handlers mentioned above will be used by default with no need to explicitly declare them).
    
    4) Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.
---
 build.gradle                                       |   2 +
 checkstyle/import-control.xml                      |   3 +
 .../apache/kafka/common/config/SaslConfigs.java    |  33 ++
 .../kafka/common/network/SaslChannelBuilder.java   |  30 +-
 .../security/authenticator/LoginManager.java       |  10 +-
 .../oauthbearer/OAuthBearerLoginModule.java        | 330 +++++++++++
 .../security/oauthbearer/OAuthBearerToken.java     | 105 ++++
 .../oauthbearer/OAuthBearerTokenCallback.java      | 122 ++++
 .../oauthbearer/OAuthBearerValidatorCallback.java  | 154 ++++++
 .../internal/OAuthBearerRefreshingLogin.java       | 153 +++++
 .../internal/OAuthBearerSaslClient.java            | 178 ++++++
 .../OAuthBearerSaslClientCallbackHandler.java      |  96 ++++
 .../internal/OAuthBearerSaslClientProvider.java    |  37 ++
 .../internal/OAuthBearerSaslServer.java            | 224 ++++++++
 .../internal/OAuthBearerSaslServerProvider.java    |  37 ++
 .../internal/expiring/ExpiringCredential.java      |  66 +++
 .../expiring/ExpiringCredentialRefreshConfig.java  | 124 +++++
 .../ExpiringCredentialRefreshingLogin.java         | 429 ++++++++++++++
 .../unsecured/OAuthBearerConfigException.java      |  35 ++
 .../OAuthBearerIllegalTokenException.java          |  53 ++
 .../internal/unsecured/OAuthBearerScopeUtils.java  |  75 +++
 .../unsecured/OAuthBearerUnsecuredJws.java         | 371 +++++++++++++
 .../OAuthBearerUnsecuredLoginCallbackHandler.java  | 288 ++++++++++
 ...uthBearerUnsecuredValidatorCallbackHandler.java | 217 ++++++++
 .../unsecured/OAuthBearerValidationResult.java     | 126 +++++
 .../unsecured/OAuthBearerValidationUtils.java      | 200 +++++++
 .../kafka/common/config/SaslConfigsTest.java       | 124 +++++
 .../authenticator/SaslAuthenticatorTest.java       |  32 ++
 .../security/authenticator/TestJaasConfig.java     |  20 +-
 .../oauthbearer/OAuthBearerLoginModuleTest.java    | 308 +++++++++++
 .../oauthbearer/OAuthBearerTokenCallbackTest.java  |  78 +++
 .../OAuthBearerValidatorCallbackTest.java          |  78 +++
 .../internal/OAuthBearerSaslServerTest.java        | 113 ++++
 .../ExpiringCredentialRefreshConfigTest.java       |  43 ++
 .../ExpiringCredentialRefreshingLoginTest.java     | 615 +++++++++++++++++++++
 .../unsecured/OAuthBearerScopeUtilsTest.java       |  53 ++
 .../unsecured/OAuthBearerUnsecuredJwsTest.java     | 156 ++++++
 ...uthBearerUnsecuredLoginCallbackHandlerTest.java | 127 +++++
 ...earerUnsecuredValidatorCallbackHandlerTest.java | 182 ++++++
 .../unsecured/OAuthBearerValidationUtilsTest.java  | 248 +++++++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   6 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  16 +
 ...slOAuthBearerSslEndToEndAuthorizationTest.scala |  26 +
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   4 +
 .../scala/unit/kafka/utils/JaasTestUtils.scala     |  27 +-
 docs/security.html                                 | 221 +++++++-
 46 files changed, 5956 insertions(+), 19 deletions(-)

diff --git a/build.gradle b/build.gradle
index 3e4388a..ab16b42 100644
--- a/build.gradle
+++ b/build.gradle
@@ -787,6 +787,7 @@ project(':clients') {
     compile libs.lz4
     compile libs.snappy
     compile libs.slf4jApi
+    compile libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
 
     testCompile libs.bcpkix
     testCompile libs.junit
@@ -858,6 +859,7 @@ project(':clients') {
     include "**/org/apache/kafka/common/security/auth/*"
     include "**/org/apache/kafka/server/policy/*"
     include "**/org/apache/kafka/common/security/token/delegation/*"
+    include "**/org/apache/kafka/common/security/oauthbearer/*"
   }
 }
 
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 1afb83a..e4f9a4e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -101,6 +101,9 @@
       <subpackage name="scram">
         <allow pkg="javax.crypto" />
       </subpackage>
+      <subpackage name="oauthbearer">
+        <allow pkg="com.fasterxml.jackson.databind" />
+      </subpackage>
     </subpackage>
 
     <subpackage name="protocol">
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 41919f9..db93ea4 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.config;
 
+import org.apache.kafka.common.config.ConfigDef.Range;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 
 import java.util.List;
@@ -89,6 +90,34 @@ public class SaslConfigs {
     public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "Login thread sleep time between refresh attempts.";
     public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
 
+    public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR = "sasl.login.refresh.window.factor";
+    public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC = "Login refresh thread will sleep until the specified window factor relative to the"
+            + " credential's lifetime has been reached, at which time it will try to refresh the credential."
+            + " Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used"
+            + " if no value is specified. Currently applies only to OAUTHBEARER.";
+    public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80;
+
+    public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER = "sasl.login.refresh.window.jitter";
+    public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC = "The maximum amount of random jitter relative to the credential's lifetime"
+            + " that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;"
+            + " a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.";
+    public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05;
+
+    public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS = "sasl.login.refresh.min.period.seconds";
+    public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC = "The desired minimum time for the login refresh thread to wait before refreshing a credential,"
+            + " in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified.  This value and "
+            + " sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
+            + " Currently applies only to OAUTHBEARER.";
+    public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60;
+
+    public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS = "sasl.login.refresh.buffer.seconds";
+    public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC = "The amount of buffer time before credential expiration to maintain when refreshing a credential,"
+            + " in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain"
+            + " as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of  300 (5 minutes) is used if no value is specified."
+            + " This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
+            + " Currently applies only to OAUTHBEARER.";
+    public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300;
+
     /**
      * @deprecated As of 1.0.0. This field will be removed in a future major release.
      */
@@ -111,6 +140,10 @@ public class SaslConfigs {
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                .define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR, Range.between(0.5, 1.0), ConfigDef.Importance.LOW, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER, Range.between(0.0, 0.25), ConfigDef.Importance.LOW, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC)
+                .define(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS, Range.between(0, 900), ConfigDef.Importance.LOW, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC)
+                .define(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, Range.between(0, 3600), ConfigDef.Importance.LOW, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC)
                 .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC)
                 .define(SaslConfigs.SASL_JAAS_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_JAAS_CONFIG_DOC)
                 .define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 3985c7e..15df089 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -35,6 +35,10 @@ import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler;
 import org.apache.kafka.common.security.kerberos.KerberosClientCallbackHandler;
 import org.apache.kafka.common.security.kerberos.KerberosLogin;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerRefreshingLogin;
+import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslClientCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.internal.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler;
 import org.apache.kafka.common.security.plain.internal.PlainSaslServer;
 import org.apache.kafka.common.security.plain.internal.PlainServerCallbackHandler;
 import org.apache.kafka.common.security.scram.ScramCredential;
@@ -118,7 +122,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                 entry.getValue().configure(configs, mechanism, jaasContexts.get(mechanism).configurationEntries());
             }
 
-            Class<? extends Login> defaultLoginClass = DefaultLogin.class;
+            Class<? extends Login> defaultLoginClass = defaultLoginClass(configs);
             if (jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM)) {
                 String defaultRealm;
                 try {
@@ -130,7 +134,6 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                 List<String> principalToLocalRules = (List<String>) configs.get(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG);
                 if (principalToLocalRules != null)
                     kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
-                defaultLoginClass = KerberosLogin.class;
             }
             for (Map.Entry<String, JaasContext> entry : jaasContexts.entrySet()) {
                 String mechanism = entry.getKey();
@@ -271,7 +274,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
     private void createClientCallbackHandler(Map<String, ?> configs) {
         Class<? extends AuthenticateCallbackHandler> clazz = (Class<? extends AuthenticateCallbackHandler>) configs.get(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS);
         if (clazz == null)
-            clazz = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM) ? KerberosClientCallbackHandler.class : SaslClientCallbackHandler.class;
+            clazz = clientCallbackHandlerClass();
         AuthenticateCallbackHandler callbackHandler = Utils.newInstance(clazz);
         saslCallbackHandlers.put(clientSaslMechanism, callbackHandler);
     }
@@ -288,9 +291,30 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                 callbackHandler = new PlainServerCallbackHandler();
             else if (ScramMechanism.isScram(mechanism))
                 callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class), tokenCache);
+            else if (mechanism.equals(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM))
+                callbackHandler = new OAuthBearerUnsecuredValidatorCallbackHandler();
             else
                 callbackHandler = new SaslServerCallbackHandler();
             saslCallbackHandlers.put(mechanism, callbackHandler);
         }
     }
+
+    private Class<? extends Login> defaultLoginClass(Map<String, ?> configs) {
+        if (jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM))
+            return KerberosLogin.class;
+        if (OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(clientSaslMechanism))
+            return OAuthBearerRefreshingLogin.class;
+        return DefaultLogin.class;
+    }
+
+    private Class<? extends AuthenticateCallbackHandler> clientCallbackHandlerClass() {
+        switch (clientSaslMechanism) {
+            case SaslConfigs.GSSAPI_MECHANISM:
+                return KerberosClientCallbackHandler.class;
+            case OAuthBearerLoginModule.OAUTHBEARER_MECHANISM:
+                return OAuthBearerSaslClientCallbackHandler.class;
+            default:
+                return SaslClientCallbackHandler.class;
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 4ae798d..69ed7b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.Login;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.internal.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,9 +92,11 @@ public class LoginManager {
                                                    Map<String, ?> configs) throws IOException, LoginException {
         Class<? extends Login> loginClass = configuredClassOrDefault(configs, jaasContext,
                 saslMechanism, SaslConfigs.SASL_LOGIN_CLASS, defaultLoginClass);
-        Class<? extends AuthenticateCallbackHandler> loginCallbackClass = configuredClassOrDefault(configs,
-                jaasContext, saslMechanism, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
-                AbstractLogin.DefaultLoginCallbackHandler.class);
+        Class<? extends AuthenticateCallbackHandler> defaultLoginCallbackHandlerClass = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM
+                .equals(saslMechanism) ? OAuthBearerUnsecuredLoginCallbackHandler.class
+                        : AbstractLogin.DefaultLoginCallbackHandler.class;
+        Class<? extends AuthenticateCallbackHandler> loginCallbackClass = configuredClassOrDefault(configs, jaasContext,
+                saslMechanism, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, defaultLoginCallbackHandlerClass);
         synchronized (LoginManager.class) {
             LoginManager loginManager;
             Password jaasConfigValue = jaasContext.dynamicJaasConfig();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
new file mode 100644
index 0000000..ddadb99
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.Login;
+import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslClientProvider;
+import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslServerProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@code LoginModule} for the SASL/OAUTHBEARER mechanism. When a client
+ * (whether a non-broker client or a broker when SASL/OAUTHBEARER is the
+ * inter-broker protocol) connects to Kafka the {@code OAuthBearerLoginModule}
+ * instance asks its configured {@link AuthenticateCallbackHandler}
+ * implementation to handle an instance of {@link OAuthBearerTokenCallback} and
+ * return an instance of {@link OAuthBearerToken}. A default, builtin
+ * {@link AuthenticateCallbackHandler} implementation creates an unsecured token
+ * as defined by these JAAS module options:
+ * <p>
+ * <table>
+ * <tr>
+ * <th>JAAS Module Option for Unsecured Token Retrieval</th>
+ * <th>Documentation</th>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredLoginStringClaim_<claimname>="value"}</td>
+ * <td>Creates a {@code String} claim with the given name and value. Any valid
+ * claim name can be specified except '{@code iat}' and '{@code exp}' (these are
+ * automatically generated).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredLoginNumberClaim_<claimname>="value"}</td>
+ * <td>Creates a {@code Number} claim with the given name and value. Any valid
+ * claim name can be specified except '{@code iat}' and '{@code exp}' (these are
+ * automatically generated).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredLoginListClaim_<claimname>="value"}</td>
+ * <td>Creates a {@code String List} claim with the given name and values parsed
+ * from the given value where the first character is taken as the delimiter. For
+ * example: {@code unsecuredLoginListClaim_fubar="|value1|value2"}. Any valid
+ * claim name can be specified except '{@code iat}' and '{@code exp}' (these are
+ * automatically generated).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredLoginPrincipalClaimName}</td>
+ * <td>Set to a custom claim name if you wish the name of the {@code String}
+ * claim holding the principal name to be something other than
+ * '{@code sub}'.</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredLoginLifetimeSeconds}</td>
+ * <td>Set to an integer value if the token expiration is to be set to something
+ * other than the default value of 3600 seconds (which is 1 hour). The
+ * '{@code exp}' claim will be set to reflect the expiration time.</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredLoginScopeClaimName}</td>
+ * <td>Set to a custom claim name if you wish the name of the {@code String} or
+ * {@code String List} claim holding any token scope to be something other than
+ * '{@code scope}'.</td>
+ * </tr>
+ * </table>
+ * <p>
+ * Production use cases will require writing an implementation of
+ * {@link AuthenticateCallbackHandler} that can handle an instance of
+ * {@link OAuthBearerTokenCallback} and declaring it via either the
+ * {@code sasl.login.callback.handler.class} configuration option for a
+ * non-broker client or via the
+ * {@code listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class}
+ * configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker
+ * protocol).
+ * <p>
+ * This class stores the retrieved {@link OAuthBearerToken} in the
+ * {@code Subject}'s private credentials where the {@code SaslClient} can
+ * retrieve it. An appropriate, builtin {@code SaslClient} implementation is
+ * automatically used and configured such that it can perform that retrieval.
+ * <p>
+ * Here is a typical, basic JAAS configuration for a client leveraging unsecured
+ * SASL/OAUTHBEARER authentication:
+ * 
+ * <pre>
+ * KafkaClient {
+ *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
+ *      unsecuredLoginStringClaim_sub="thePrincipalName";
+ * };
+ * </pre>
+ * 
+ * An implementation of the {@link Login} interface specific to the
+ * {@code OAUTHBEARER} mechanism is automatically applied; it periodically
+ * refreshes any token before it expires so that the client can continue to make
+ * connections to brokers. The parameters that impact how the refresh algorithm
+ * operates are specified as part of the producer/consumer/broker configuration
+ * and are as follows. See the documentation for these properties elsewhere for
+ * details.
+ * <p>
+ * <table>
+ * <tr>
+ * <th>Producer/Consumer/Broker Configuration Property</th>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.window.factor}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.window.jitter}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.min.period.seconds}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
+ * </tr>
+ * </table>
+ * <p>
+ * When a broker accepts a SASL/OAUTHBEARER connection the instance of the
+ * builtin {@code SaslServer} implementation asks its configured
+ * {@link AuthenticateCallbackHandler} implementation to handle an instance of
+ * {@link OAuthBearerValidatorCallback} constructed with the OAuth 2 Bearer
+ * Token's compact serialization and return an instance of
+ * {@link OAuthBearerToken} if the value validates. A default, builtin
+ * {@link AuthenticateCallbackHandler} implementation validates an unsecured
+ * token as defined by these JAAS module options:
+ * <p>
+ * <table>
+ * <tr>
+ * <th>JAAS Module Option for Unsecured Token Validation</th>
+ * <th>Documentation</th>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredValidatorPrincipalClaimName="value"}</td>
+ * <td>Set to a non-empty value if you wish a particular {@code String} claim
+ * holding a principal name to be checked for existence; the default is to check
+ * for the existence of the '{@code sub}' claim.</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredValidatorScopeClaimName="value"}</td>
+ * <td>Set to a custom claim name if you wish the name of the {@code String} or
+ * {@code String List} claim holding any token scope to be something other than
+ * '{@code scope}'.</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredValidatorRequiredScope="value"}</td>
+ * <td>Set to a space-delimited list of scope values if you wish the
+ * {@code String/String List} claim holding the token scope to be checked to
+ * make sure it contains certain values.</td>
+ * </tr>
+ * <tr>
+ * <td>{@code unsecuredValidatorAllowableClockSkewMs="value"}</td>
+ * <td>Set to a positive integer value if you wish to allow up to some number of
+ * positive milliseconds of clock skew (the default is 0).</td>
+ * </tr>
+ * </table>
+ * <p>
+ * Here is a typical, basic JAAS configuration for a broker leveraging unsecured
+ * SASL/OAUTHBEARER validation:
+ * 
+ * <pre>
+ * KafkaServer {
+ *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
+ *      unsecuredLoginStringClaim_sub="thePrincipalName";
+ * };
+ * </pre>
+ * 
+ * Production use cases will require writing an implementation of
+ * {@link AuthenticateCallbackHandler} that can handle an instance of
+ * {@link OAuthBearerValidatorCallback} and declaring it via the
+ * {@code listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class}
+ * broker configuration option.
+ * <p>
+ * The builtin {@code SaslServer} implementation for SASL/OAUTHBEARER in Kafka
+ * makes the instance of {@link OAuthBearerToken} available upon successful
+ * authentication via the negotiated property "{@code OAUTHBEARER.token}"; the
+ * token could be used in a custom authorizer (to authorize based on JWT claims
+ * rather than ACLs, for example).
+ * <p>
+ * This implementation's {@code logout()} method will logout the specific token
+ * that this instance logged in if it's {@code Subject} instance is shared
+ * across multiple {@code LoginContext}s and there happen to be multiple tokens
+ * on the {@code Subject}. This functionality is useful because it means a new
+ * token with a longer lifetime can be created before a soon-to-expire token is
+ * actually logged out. Otherwise, if multiple simultaneous tokens were not
+ * supported like this, the soon-to-be expired token would have to be logged out
+ * first, and then if the new token could not be retrieved (maybe the
+ * authorization server is temporarily unavailable, for example) the client
+ * would be left without a token and would be unable to create new connections.
+ * Better to mitigate this possibility by leaving the existing token (which
+ * still has some lifetime left) in place until a new replacement token is
+ * actually retrieved. This implementation supports this.
+ * 
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
+ */
+public class OAuthBearerLoginModule implements LoginModule {
+    /**
+     * The SASL Mechanism name for OAuth 2: {@code OAUTHBEARER}
+     */
+    public static final String OAUTHBEARER_MECHANISM = "OAUTHBEARER";
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginModule.class);
+    private Subject subject = null;
+    private AuthenticateCallbackHandler callbackHandler = null;
+    private OAuthBearerToken tokenRequiringCommit = null;
+    private OAuthBearerToken myCommittedToken = null;
+
+    static {
+        OAuthBearerSaslClientProvider.initialize(); // not part of public API
+        OAuthBearerSaslServerProvider.initialize(); // not part of public API
+    }
+
+    @Override
+    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState,
+            Map<String, ?> options) {
+        this.subject = Objects.requireNonNull(subject);
+        if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler))
+            throw new IllegalArgumentException(String.format("Callback handler must be castable to %s: %s",
+                    AuthenticateCallbackHandler.class.getName(), callbackHandler.getClass().getName()));
+        this.callbackHandler = (AuthenticateCallbackHandler) callbackHandler;
+    }
+
+    @Override
+    public boolean login() throws LoginException {
+        if (tokenRequiringCommit != null)
+            throw new IllegalStateException(String.format(
+                    "Already have an uncommitted token with private credential token count=%d", committedTokenCount()));
+        if (myCommittedToken != null)
+            throw new IllegalStateException(String.format(
+                    "Already have a committed token with private credential token count=%d; must login on another login context or logout here first before reusing the same login context",
+                    committedTokenCount()));
+        OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+        try {
+            callbackHandler.handle(new Callback[] {callback});
+        } catch (IOException | UnsupportedCallbackException e) {
+            log.error(e.getMessage(), e);
+            throw new LoginException("An internal error occurred");
+        }
+        tokenRequiringCommit = callback.token();
+        if (tokenRequiringCommit == null) {
+            log.info(String.format("Login failed: %s : %s (URI=%s)", callback.errorCode(), callback.errorDescription(),
+                    callback.errorUri()));
+            throw new LoginException(callback.errorDescription());
+        }
+        log.info("Login succeeded; invoke commit() to commit it; current committed token count={}",
+                committedTokenCount());
+        return true;
+    }
+
+    @Override
+    public boolean logout() {
+        if (tokenRequiringCommit != null)
+            throw new IllegalStateException(
+                    "Cannot call logout() immediately after login(); need to first invoke commit() or abort()");
+        if (myCommittedToken == null) {
+            if (log.isDebugEnabled())
+                log.debug("Nothing here to log out");
+            return false;
+        }
+        log.info("Logging out my token; current committed token count = {}", committedTokenCount());
+        for (Iterator<Object> iterator = subject.getPrivateCredentials().iterator(); iterator.hasNext();) {
+            Object privateCredential = iterator.next();
+            if (privateCredential == myCommittedToken) {
+                iterator.remove();
+                myCommittedToken = null;
+                break;
+            }
+        }
+        log.info("Done logging out my token; committed token count is now {}", committedTokenCount());
+        return true;
+    }
+
+    @Override
+    public boolean commit() throws LoginException {
+        if (tokenRequiringCommit == null) {
+            if (log.isDebugEnabled())
+                log.debug("Nothing here to commit");
+            return false;
+        }
+        log.info("Committing my token; current committed token count = {}", committedTokenCount());
+        subject.getPrivateCredentials().add(tokenRequiringCommit);
+        myCommittedToken = tokenRequiringCommit;
+        tokenRequiringCommit = null;
+        log.info("Done committing my token; committed token count is now {}", committedTokenCount());
+        return true;
+    }
+
+    @Override
+    public boolean abort() throws LoginException {
+        if (tokenRequiringCommit != null) {
+            log.info("Login aborted");
+            tokenRequiringCommit = null;
+            return true;
+        }
+        if (log.isDebugEnabled())
+            log.debug("Nothing here to abort");
+        return false;
+    }
+
+    private int committedTokenCount() {
+        return subject.getPrivateCredentials(OAuthBearerToken.class).size();
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerToken.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerToken.java
new file mode 100644
index 0000000..ee443ed
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerToken.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import java.util.Set;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The <code>b64token</code> value as defined in
+ * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
+ * 2.1</a> along with the token's specific scope and lifetime and principal
+ * name.
+ * <p>
+ * A network request would be required to re-hydrate an opaque token, and that
+ * could result in (for example) an {@code IOException}, but retrievers for
+ * various attributes ({@link #scope()}, {@link #lifetimeMs()}, etc.) declare no
+ * exceptions. Therefore, if a network request is required for any of these
+ * retriever methods, that request could be performed at construction time so
+ * that the various attributes can be reliably provided thereafter. For example,
+ * a constructor might declare {@code throws IOException} in such a case.
+ * Alternatively, the retrievers could throw unchecked exceptions.
+ * <p>
+ * This interface was introduced in 2.0.0 and, while it feels stable, it could
+ * evolve. We will try to evolve the API in a compatible manner (easier now that
+ * Java 7 and its lack of default methods doesn't have to be supported), but we
+ * reserve the right to make breaking changes in minor releases, if necessary.
+ * We will update the {@code InterfaceStability} annotation and this notice once
+ * the API is considered stable.
+ * 
+ * @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
+ *      Section 1.4</a> and
+ *      <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
+ *      Section 2.1</a>
+ */
+@InterfaceStability.Evolving
+public interface OAuthBearerToken {
+    /**
+     * The <code>b64token</code> value as defined in
+     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
+     * 2.1</a>
+     * 
+     * @return <code>b64token</code> value as defined in
+     *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
+     *         Section 2.1</a>
+     */
+    String value();
+
+    /**
+     * The token's scope of access, as per
+     * <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
+     * 1.4</a>
+     * 
+     * @return the token's (always non-null but potentially empty) scope of access,
+     *         as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
+     *         6749 Section 1.4</a>. Note that all values in the returned set will
+     *         be trimmed of preceding and trailing whitespace, and the result will
+     *         never contain the empty string.
+     */
+    Set<String> scope();
+
+    /**
+     * The token's lifetime, expressed as the number of milliseconds since the
+     * epoch, as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
+     * 6749 Section 1.4</a>
+     * 
+     * @return the token'slifetime, expressed as the number of milliseconds since
+     *         the epoch, as per
+     *         <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
+     *         Section 1.4</a>.
+     */
+    long lifetimeMs();
+
+    /**
+     * The name of the principal to which this credential applies
+     * 
+     * @return the always non-null/non-empty principal name
+     */
+    String principalName();
+
+    /**
+     * When the credential became valid, in terms of the number of milliseconds
+     * since the epoch, if known, otherwise null. An expiring credential may not
+     * necessarily indicate when it was created -- just when it expires -- so we
+     * need to support a null return value here.
+     * 
+     * @return the time when the credential became valid, in terms of the number of
+     *         milliseconds since the epoch, if known, otherwise null
+     */
+    Long startTimeMs();
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
new file mode 100644
index 0000000..62ce492
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import java.util.Objects;
+
+import javax.security.auth.callback.Callback;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * A {@code Callback} for use by the {@code SaslClient} and {@code Login}
+ * implementations when they require an OAuth 2 bearer token. Callback handlers
+ * should use the {@link #error(String, String, String)} method to communicate
+ * errors returned by the authorization server as per
+ * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
+ * 2.0 Authorization Framework</a>. Callback handlers should communicate other
+ * problems by raising an {@code IOException}.
+ * <p>
+ * This class was introduced in 2.0.0 and, while it feels stable, it could
+ * evolve. We will try to evolve the API in a compatible manner, but we reserve
+ * the right to make breaking changes in minor releases, if necessary. We will
+ * update the {@code InterfaceStability} annotation and this notice once the API
+ * is considered stable.
+ */
+@InterfaceStability.Evolving
+public class OAuthBearerTokenCallback implements Callback {
+    private OAuthBearerToken token = null;
+    private String errorCode = null;
+    private String errorDescription = null;
+    private String errorUri = null;
+
+    /**
+     * Return the (potentially null) token
+     * 
+     * @return the (potentially null) token
+     */
+    public OAuthBearerToken token() {
+        return token;
+    }
+
+    /**
+     * Return the optional (but always non-empty if not null) error code as per
+     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
+     * 2.0 Authorization Framework</a>.
+     * 
+     * @return the optional (but always non-empty if not null) error code
+     */
+    public String errorCode() {
+        return errorCode;
+    }
+
+    /**
+     * Return the (potentially null) error description as per
+     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
+     * 2.0 Authorization Framework</a>.
+     * 
+     * @return the (potentially null) error description
+     */
+    public String errorDescription() {
+        return errorDescription;
+    }
+
+    /**
+     * Return the (potentially null) error URI as per
+     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
+     * 2.0 Authorization Framework</a>.
+     * 
+     * @return the (potentially null) error URI
+     */
+    public String errorUri() {
+        return errorUri;
+    }
+
+    /**
+     * Set the token. All error-related values are cleared.
+     * 
+     * @param token
+     *            the mandatory token to set
+     */
+    public void token(OAuthBearerToken token) {
+        this.token = Objects.requireNonNull(token);
+        this.errorCode = null;
+        this.errorDescription = null;
+        this.errorUri = null;
+    }
+
+    /**
+     * Set the error values as per
+     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
+     * 2.0 Authorization Framework</a>. Any token is cleared.
+     * 
+     * @param errorCode
+     *            the mandatory error code to set
+     * @param errorDescription
+     *            the optional error description to set
+     * @param errorUri
+     *            the optional error URI to set
+     */
+    public void error(String errorCode, String errorDescription, String errorUri) {
+        if (Objects.requireNonNull(errorCode).isEmpty())
+            throw new IllegalArgumentException("error code must not be empty");
+        this.errorCode = errorCode;
+        this.errorDescription = errorDescription;
+        this.errorUri = errorUri;
+        this.token = null;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallback.java
new file mode 100644
index 0000000..36bcf08
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallback.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import java.util.Objects;
+
+import javax.security.auth.callback.Callback;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * A {@code Callback} for use by the {@code SaslServer} implementation when it
+ * needs to provide an OAuth 2 bearer token compact serialization for
+ * validation. Callback handlers should use the
+ * {@link #error(String, String, String)} method to communicate errors back to
+ * the SASL Client as per
+ * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
+ * 2.0 Authorization Framework</a> and the <a href=
+ * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
+ * OAuth Extensions Error Registry</a>. Callback handlers should communicate
+ * other problems by raising an {@code IOException}.
+ * <p>
+ * This class was introduced in 2.0.0 and, while it feels stable, it could
+ * evolve. We will try to evolve the API in a compatible manner, but we reserve
+ * the right to make breaking changes in minor releases, if necessary. We will
+ * update the {@code InterfaceStability} annotation and this notice once the API
+ * is considered stable.
+ */
+@InterfaceStability.Evolving
+public class OAuthBearerValidatorCallback implements Callback {
+    private final String tokenValue;
+    private OAuthBearerToken token = null;
+    private String errorStatus = null;
+    private String errorScope = null;
+    private String errorOpenIDConfiguration = null;
+
+    /**
+     * Constructor
+     * 
+     * @param tokenValue
+     *            the mandatory/non-blank token value
+     */
+    public OAuthBearerValidatorCallback(String tokenValue) {
+        if (Objects.requireNonNull(tokenValue).isEmpty())
+            throw new IllegalArgumentException("token value must not be empty");
+        this.tokenValue = tokenValue;
+    }
+
+    /**
+     * Return the (always non-null) token value
+     * 
+     * @return the (always non-null) token value
+     */
+    public String tokenValue() {
+        return tokenValue;
+    }
+
+    /**
+     * Return the (potentially null) token
+     * 
+     * @return the (potentially null) token
+     */
+    public OAuthBearerToken token() {
+        return token;
+    }
+
+    /**
+     * Return the (potentially null) error status value as per
+     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
+     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>
+     * and the <a href=
+     * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
+     * OAuth Extensions Error Registry</a>.
+     * 
+     * @return the (potentially null) error status value
+     */
+    public String errorStatus() {
+        return errorStatus;
+    }
+
+    /**
+     * Return the (potentially null) error scope value as per
+     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
+     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
+     * 
+     * @return the (potentially null) error scope value
+     */
+    public String errorScope() {
+        return errorScope;
+    }
+
+    /**
+     * Return the (potentially null) error openid-configuration value as per
+     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
+     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
+     * 
+     * @return the (potentially null) error openid-configuration value
+     */
+    public String errorOpenIDConfiguration() {
+        return errorOpenIDConfiguration;
+    }
+
+    /**
+     * Set the token. The token value is unchanged and is expected to match the
+     * provided token's value. All error values are cleared.
+     * 
+     * @param token
+     *            the mandatory token to set
+     */
+    public void token(OAuthBearerToken token) {
+        this.token = Objects.requireNonNull(token);
+        this.errorStatus = null;
+        this.errorScope = null;
+        this.errorOpenIDConfiguration = null;
+    }
+
+    /**
+     * Set the error values as per
+     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
+     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
+     * Any token is cleared.
+     * 
+     * @param errorStatus
+     *            the mandatory error status value from the <a href=
+     *            "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
+     *            OAuth Extensions Error Registry</a> to set
+     * @param errorScope
+     *            the optional error scope value to set
+     * @param errorOpenIDConfiguration
+     *            the optional error openid-configuration value to set
+     */
+    public void error(String errorStatus, String errorScope, String errorOpenIDConfiguration) {
+        if (Objects.requireNonNull(errorStatus).isEmpty())
+            throw new IllegalArgumentException("error status must not be empty");
+        this.errorStatus = errorStatus;
+        this.errorScope = errorScope;
+        this.errorOpenIDConfiguration = errorOpenIDConfiguration;
+        this.token = null;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerRefreshingLogin.java
new file mode 100644
index 0000000..303188c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerRefreshingLogin.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.Login;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.internal.expiring.ExpiringCredential;
+import org.apache.kafka.common.security.oauthbearer.internal.expiring.ExpiringCredentialRefreshConfig;
+import org.apache.kafka.common.security.oauthbearer.internal.expiring.ExpiringCredentialRefreshingLogin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for refreshing logins for both Kafka client and
+ * server when the credential is an OAuth 2 bearer token communicated over
+ * SASL/OAUTHBEARER. An OAuth 2 bearer token has a limited lifetime, and an
+ * instance of this class periodically refreshes it so that the client can
+ * create new connections to brokers on an ongoing basis.
+ * <p>
+ * This class does not need to be explicitly set via the
+ * {@code sasl.login.class} client configuration property or the
+ * {@code listener.name.sasl_[plaintext|ssl].oauthbearer.sasl.login.class}
+ * broker configuration property when the SASL mechanism is OAUTHBEARER; it is
+ * automatically set by default in that case.
+ * <p>
+ * The parameters that impact how the refresh algorithm operates are specified
+ * as part of the producer/consumer/broker configuration and are as follows. See
+ * the documentation for these properties elsewhere for details.
+ * <table>
+ * <tr>
+ * <th>Producer/Consumer/Broker Configuration Property</th>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.window.factor}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.window.jitter}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.min.period.seconds}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
+ * </tr>
+ * </table>
+ * 
+ * @see OAuthBearerLoginModule
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
+ * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
+ */
+public class OAuthBearerRefreshingLogin implements Login {
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerRefreshingLogin.class);
+    private ExpiringCredentialRefreshingLogin expiringCredentialRefreshingLogin = null;
+
+    @Override
+    public void configure(Map<String, ?> configs, String contextName, Configuration configuration,
+            AuthenticateCallbackHandler loginCallbackHandler) {
+        /*
+         * Specify this class as the one to synchronize on so that only one OAuth 2
+         * Bearer Token is refreshed at a given time. Specify null if we don't mind
+         * multiple simultaneously refreshes. Refreshes happen on the order of minutes
+         * rather than seconds or milliseconds, and there are typically minutes of
+         * lifetime remaining when the refresh occurs, so serializing them seems
+         * reasonable.
+         */
+        Class<OAuthBearerRefreshingLogin> classToSynchronizeOnPriorToRefresh = OAuthBearerRefreshingLogin.class;
+        expiringCredentialRefreshingLogin = new ExpiringCredentialRefreshingLogin(contextName, configuration,
+                new ExpiringCredentialRefreshConfig(configs, true), loginCallbackHandler,
+                classToSynchronizeOnPriorToRefresh) {
+            @Override
+            public ExpiringCredential expiringCredential() {
+                Set<OAuthBearerToken> privateCredentialTokens = expiringCredentialRefreshingLogin.subject()
+                        .getPrivateCredentials(OAuthBearerToken.class);
+                if (privateCredentialTokens.isEmpty())
+                    return null;
+                final OAuthBearerToken token = privateCredentialTokens.iterator().next();
+                if (log.isDebugEnabled())
+                    log.debug("Found expiring credential with principal '{}'.", token.principalName());
+                return new ExpiringCredential() {
+                    @Override
+                    public String principalName() {
+                        return token.principalName();
+                    }
+
+                    @Override
+                    public Long startTimeMs() {
+                        return token.startTimeMs();
+                    }
+
+                    @Override
+                    public long expireTimeMs() {
+                        return token.lifetimeMs();
+                    }
+
+                    @Override
+                    public Long absoluteLastRefreshTimeMs() {
+                        return null;
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public void close() {
+        if (expiringCredentialRefreshingLogin != null)
+            expiringCredentialRefreshingLogin.close();
+    }
+
+    @Override
+    public Subject subject() {
+        return expiringCredentialRefreshingLogin != null ? expiringCredentialRefreshingLogin.subject() : null;
+    }
+
+    @Override
+    public String serviceName() {
+        return expiringCredentialRefreshingLogin != null ? expiringCredentialRefreshingLogin.serviceName() : null;
+    }
+
+    @Override
+    public synchronized LoginContext login() throws LoginException {
+        if (expiringCredentialRefreshingLogin != null)
+            return expiringCredentialRefreshingLogin.login();
+        throw new LoginException("Login was not configured properly");
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClient.java
new file mode 100644
index 0000000..bb6ff0a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClient.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code SaslClient} implementation for SASL/OAUTHBEARER in Kafka. This
+ * implementation requires an instance of {@code AuthenticateCallbackHandler}
+ * that can handle an instance of {@link OAuthBearerTokenCallback} and return
+ * the {@link OAuthBearerToken} generated by the {@code login()} event on the
+ * {@code LoginContext}.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750,
+ *      Section 2.1</a>
+ *
+ */
+public class OAuthBearerSaslClient implements SaslClient {
+    static final byte BYTE_CONTROL_A = (byte) 0x01;
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerSaslClient.class);
+    private final CallbackHandler callbackHandler;
+
+    enum State {
+        SEND_CLIENT_FIRST_MESSAGE, RECEIVE_SERVER_FIRST_MESSAGE, RECEIVE_SERVER_MESSAGE_AFTER_FAILURE, COMPLETE, FAILED
+    };
+
+    private State state;
+
+    public OAuthBearerSaslClient(AuthenticateCallbackHandler callbackHandler) {
+        this.callbackHandler = Objects.requireNonNull(callbackHandler);
+        setState(State.SEND_CLIENT_FIRST_MESSAGE);
+    }
+
+    public CallbackHandler callbackHandler() {
+        return callbackHandler;
+    }
+
+    @Override
+    public String getMechanismName() {
+        return OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+    }
+
+    @Override
+    public boolean hasInitialResponse() {
+        return true;
+    }
+
+    @Override
+    public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
+        try {
+            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+            switch (state) {
+                case SEND_CLIENT_FIRST_MESSAGE:
+                    if (challenge != null && challenge.length != 0)
+                        throw new SaslException("Expected empty challenge");
+                    callbackHandler().handle(new Callback[] {callback});
+                    setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
+                    return String.format("n,,auth=Bearer %s", callback.token().value())
+                            .getBytes(StandardCharsets.UTF_8);
+                case RECEIVE_SERVER_FIRST_MESSAGE:
+                    if (challenge != null && challenge.length != 0) {
+                        String jsonErrorResponse = new String(challenge, StandardCharsets.UTF_8);
+                        if (log.isDebugEnabled())
+                            log.debug("Sending %%x01 response to server after receiving an error: {}",
+                                    jsonErrorResponse);
+                        setState(State.RECEIVE_SERVER_MESSAGE_AFTER_FAILURE);
+                        return new byte[] {BYTE_CONTROL_A};
+                    }
+                    callbackHandler().handle(new Callback[] {callback});
+                    if (log.isDebugEnabled())
+                        log.debug("Successfully authenticated as {}", callback.token().principalName());
+                    setState(State.COMPLETE);
+                    return null;
+                default:
+                    throw new IllegalSaslStateException("Unexpected challenge in Sasl client state " + state);
+            }
+        } catch (SaslException e) {
+            setState(State.FAILED);
+            throw e;
+        } catch (IOException | UnsupportedCallbackException e) {
+            setState(State.FAILED);
+            throw new SaslException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean isComplete() {
+        return state == State.COMPLETE;
+    }
+
+    @Override
+    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(incoming, offset, offset + len);
+    }
+
+    @Override
+    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(outgoing, offset, offset + len);
+    }
+
+    @Override
+    public Object getNegotiatedProperty(String propName) {
+        if (!isComplete())
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return null;
+    }
+
+    @Override
+    public void dispose() throws SaslException {
+    }
+
+    private void setState(State state) {
+        log.debug("Setting SASL/{} client state to {}", OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, state);
+        this.state = state;
+    }
+
+    public static class OAuthBearerSaslClientFactory implements SaslClientFactory {
+        @Override
+        public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
+                String serverName, Map<String, ?> props, CallbackHandler callbackHandler) throws SaslException {
+            String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
+            for (String mechanism : mechanisms) {
+                for (int i = 0; i < mechanismNamesCompatibleWithPolicy.length; i++) {
+                    if (mechanismNamesCompatibleWithPolicy[i].equals(mechanism)) {
+                        if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler))
+                            throw new IllegalArgumentException(String.format(
+                                    "Callback handler must be castable to %s: %s",
+                                    AuthenticateCallbackHandler.class.getName(), callbackHandler.getClass().getName()));
+                        return new OAuthBearerSaslClient((AuthenticateCallbackHandler) callbackHandler);
+                    }
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public String[] getMechanismNames(Map<String, ?> props) {
+            return OAuthBearerSaslServer.mechanismNamesCompatibleWithPolicy(props);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClientCallbackHandler.java
new file mode 100644
index 0000000..d406a8a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClientCallbackHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal;
+
+import java.io.IOException;
+import java.security.AccessController;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+
+/**
+ * An implementation of {@code AuthenticateCallbackHandler} that recognizes
+ * {@link OAuthBearerTokenCallback} and retrieves OAuth 2 Bearer Token that was
+ * created when the {@code OAuthBearerLoginModule} logged in by looking for an
+ * instance of {@link OAuthBearerToken} in the {@code Subject}'s private
+ * credentials.
+ * <p>
+ * Use of this class is configured automatically and does not need to be
+ * explicitly set via the {@code sasl.client.callback.handler.class}
+ * configuration property.
+ */
+public class OAuthBearerSaslClientCallbackHandler implements AuthenticateCallbackHandler {
+    private boolean configured = false;
+
+    /**
+     * Return true if this instance has been configured, otherwise false
+     * 
+     * @return true if this instance has been configured, otherwise false
+     */
+    public boolean configured() {
+        return configured;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+        configured = true;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        if (!configured())
+            throw new IllegalStateException("Callback handler not configured");
+        for (Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerTokenCallback)
+                handleCallback((OAuthBearerTokenCallback) callback);
+            else
+                throw new UnsupportedCallbackException(callback);
+        }
+    }
+
+    @Override
+    public void close() {
+        // empty
+    }
+
+    private void handleCallback(OAuthBearerTokenCallback callback) throws IOException {
+        if (callback.token() != null)
+            throw new IllegalArgumentException("Callback had a token already");
+        Subject subject = Subject.getSubject(AccessController.getContext());
+        Set<OAuthBearerToken> privateCredentials = subject != null
+                ? subject.getPrivateCredentials(OAuthBearerToken.class)
+                : Collections.<OAuthBearerToken>emptySet();
+        if (privateCredentials.size() != 1)
+            throw new IOException(
+                    String.format("Unable to find OAuth Bearer token in Subject's private credentials (size=%d)",
+                            privateCredentials.size()));
+        callback.token(privateCredentials.iterator().next());
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClientProvider.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClientProvider.java
new file mode 100644
index 0000000..b292b31
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslClientProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal;
+
+import java.security.Provider;
+import java.security.Security;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslClient.OAuthBearerSaslClientFactory;
+
+public class OAuthBearerSaslClientProvider extends Provider {
+    private static final long serialVersionUID = 1L;
+
+    protected OAuthBearerSaslClientProvider() {
+        super("SASL/OAUTHBEARER Client Provider", 1.0, "SASL/OAUTHBEARER Client Provider for Kafka");
+        put("SaslClientFactory." + OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                OAuthBearerSaslClientFactory.class.getName());
+    }
+
+    public static void initialize() {
+        Security.addProvider(new OAuthBearerSaslClientProvider());
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServer.java
new file mode 100644
index 0000000..f84517d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServer.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code SaslServer} implementation for SASL/OAUTHBEARER in Kafka. An instance
+ * of {@link OAuthBearerToken} is available upon successful authentication via
+ * the negotiated property "{@code OAUTHBEARER.token}"; the token could be used
+ * in a custom authorizer (to authorize based on JWT claims rather than ACLs,
+ * for example).
+ */
+public class OAuthBearerSaslServer implements SaslServer {
+    private static final String INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE = "Invalid OAUTHBEARER client first message";
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerSaslServer.class);
+    private static final String NEGOTIATED_PROPERTY_KEY_TOKEN = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM + ".token";
+    private static final String INTERNAL_ERROR_ON_SERVER = "Authentication could not be performed due to an internal error on the server";
+    private static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+";
+    private static final Pattern CLIENT_INITIAL_RESPONSE_PATTERN = Pattern.compile(
+            String.format("n,(a=(?<authzid>%s))?,auth=(?<scheme>[\\w]+)[ ]+(?<token>[-_\\.a-zA-Z0-9]+)", SASLNAME));
+
+    private final AuthenticateCallbackHandler callbackHandler;
+
+    private boolean complete;
+    private OAuthBearerToken tokenForNegotiatedProperty = null;
+    private String errorMessage = null;
+
+    public OAuthBearerSaslServer(CallbackHandler callbackHandler) {
+        if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler))
+            throw new IllegalArgumentException(String.format("Callback handler must be castable to %s: %s",
+                    AuthenticateCallbackHandler.class.getName(), callbackHandler.getClass().getName()));
+        this.callbackHandler = (AuthenticateCallbackHandler) callbackHandler;
+    }
+
+    /**
+     * @throws SaslAuthenticationException
+     *             if access token cannot be validated
+     *             <p>
+     *             <b>Note:</b> This method may throw
+     *             {@link SaslAuthenticationException} to provide custom error
+     *             messages to clients. But care should be taken to avoid including
+     *             any information in the exception message that should not be
+     *             leaked to unauthenticated clients. It may be safer to throw
+     *             {@link SaslException} in some cases so that a standard error
+     *             message is returned to clients.
+     *             </p>
+     */
+    @Override
+    public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
+        if (response.length == 1 && response[0] == OAuthBearerSaslClient.BYTE_CONTROL_A && errorMessage != null) {
+            if (log.isDebugEnabled())
+                log.debug("Received %x01 response from client after it received our error");
+            throw new SaslAuthenticationException(errorMessage);
+        }
+        errorMessage = null;
+        String responseMsg = new String(response, StandardCharsets.UTF_8);
+        Matcher matcher = CLIENT_INITIAL_RESPONSE_PATTERN.matcher(responseMsg);
+        if (!matcher.matches()) {
+            if (log.isDebugEnabled())
+                log.debug(INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE);
+            throw new SaslException(INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE);
+        }
+        String authzid = matcher.group("authzid");
+        String authorizationId = authzid != null ? authzid : "";
+        if (!"bearer".equalsIgnoreCase(matcher.group("scheme"))) {
+            String msg = String.format("Invalid scheme in OAUTHBEARER client first message: %s",
+                    matcher.group("scheme"));
+            if (log.isDebugEnabled())
+                log.debug(msg);
+            throw new SaslException(msg);
+        }
+        String tokenValue = matcher.group("token");
+        return process(tokenValue, authorizationId);
+    }
+
+    @Override
+    public String getAuthorizationID() {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return tokenForNegotiatedProperty.principalName();
+    }
+
+    @Override
+    public String getMechanismName() {
+        return OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+    }
+
+    @Override
+    public Object getNegotiatedProperty(String propName) {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return NEGOTIATED_PROPERTY_KEY_TOKEN.equals(propName) ? tokenForNegotiatedProperty : null;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return complete;
+    }
+
+    @Override
+    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(incoming, offset, offset + len);
+    }
+
+    @Override
+    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not completed");
+        return Arrays.copyOfRange(outgoing, offset, offset + len);
+    }
+
+    @Override
+    public void dispose() throws SaslException {
+        complete = false;
+        tokenForNegotiatedProperty = null;
+    }
+
+    private byte[] process(String tokenValue, String authorizationId) throws SaslException {
+        OAuthBearerValidatorCallback callback = new OAuthBearerValidatorCallback(tokenValue);
+        try {
+            callbackHandler.handle(new Callback[] {callback});
+        } catch (IOException | UnsupportedCallbackException e) {
+            String msg = String.format("%s: %s", INTERNAL_ERROR_ON_SERVER, e.getMessage());
+            if (log.isDebugEnabled())
+                log.debug(msg, e);
+            throw new SaslException(msg);
+        }
+        OAuthBearerToken token = callback.token();
+        if (token == null) {
+            errorMessage = jsonErrorResponse(callback.errorStatus(), callback.errorScope(),
+                    callback.errorOpenIDConfiguration());
+            if (log.isDebugEnabled())
+                log.debug(errorMessage);
+            return errorMessage.getBytes(StandardCharsets.UTF_8);
+        }
+        /*
+         * We support the client specifying an authorization ID as per the SASL
+         * specification, but it must match the principal name if it is specified.
+         */
+        if (!authorizationId.isEmpty() && !authorizationId.equals(token.principalName()))
+            throw new SaslAuthenticationException(String.format(
+                    "Authentication failed: Client requested an authorization id (%s) that is different from the token's principal name (%s)",
+                    authorizationId, token.principalName()));
+        tokenForNegotiatedProperty = token;
+        complete = true;
+        if (log.isDebugEnabled())
+            log.debug("Successfully authenticate User={}", token.principalName());
+        return new byte[0];
+    }
+
+    private static String jsonErrorResponse(String errorStatus, String errorScope, String errorOpenIDConfiguration) {
+        String jsonErrorResponse = String.format("{\"status\":\"%s\"", errorStatus);
+        if (errorScope != null)
+            jsonErrorResponse = String.format("%s, \"scope\":\"%s\"", jsonErrorResponse, errorScope);
+        if (errorOpenIDConfiguration != null)
+            jsonErrorResponse = String.format("%s, \"openid-configuration\":\"%s\"", jsonErrorResponse,
+                    errorOpenIDConfiguration);
+        jsonErrorResponse = String.format("%s}", jsonErrorResponse);
+        return jsonErrorResponse;
+    }
+
+    public static String[] mechanismNamesCompatibleWithPolicy(Map<String, ?> props) {
+        return props != null && "true".equals(String.valueOf(props.get(Sasl.POLICY_NOPLAINTEXT))) ? new String[] {}
+                : new String[] {OAuthBearerLoginModule.OAUTHBEARER_MECHANISM};
+    }
+
+    public static class OAuthBearerSaslServerFactory implements SaslServerFactory {
+        @Override
+        public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
+                CallbackHandler callbackHandler) throws SaslException {
+            String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
+            for (int i = 0; i < mechanismNamesCompatibleWithPolicy.length; i++) {
+                if (mechanismNamesCompatibleWithPolicy[i].equals(mechanism)) {
+                    return new OAuthBearerSaslServer(callbackHandler);
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public String[] getMechanismNames(Map<String, ?> props) {
+            return OAuthBearerSaslServer.mechanismNamesCompatibleWithPolicy(props);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServerProvider.java
new file mode 100644
index 0000000..cb6a94d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServerProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal;
+
+import java.security.Provider;
+import java.security.Security;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslServer.OAuthBearerSaslServerFactory;
+
+public class OAuthBearerSaslServerProvider extends Provider {
+    private static final long serialVersionUID = 1L;
+
+    protected OAuthBearerSaslServerProvider() {
+        super("SASL/OAUTHBEARER Server Provider", 1.0, "SASL/OAUTHBEARER Server Provider for Kafka");
+        put("SaslServerFactory." + OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                OAuthBearerSaslServerFactory.class.getName());
+    }
+
+    public static void initialize() {
+        Security.addProvider(new OAuthBearerSaslServerProvider());
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredential.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredential.java
new file mode 100644
index 0000000..472e980
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredential.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.expiring;
+
+/**
+ * A credential that expires and that can potentially be refreshed
+ * 
+ * @see ExpiringCredentialRefreshingLogin
+ */
+public interface ExpiringCredential {
+    /**
+     * The name of the principal to which this credential applies (used only for
+     * logging)
+     * 
+     * @return the always non-null/non-empty principal name
+     */
+    String principalName();
+
+    /**
+     * When the credential became valid, in terms of the number of milliseconds
+     * since the epoch, if known, otherwise null. An expiring credential may not
+     * necessarily indicate when it was created -- just when it expires -- so we
+     * need to support a null return value here.
+     * 
+     * @return the time when the credential became valid, in terms of the number of
+     *         milliseconds since the epoch, if known, otherwise null
+     */
+    Long startTimeMs();
+
+    /**
+     * When the credential expires, in terms of the number of milliseconds since the
+     * epoch. All expiring credentials by definition must indicate their expiration
+     * time -- thus, unlike other methods, we do not support a null return value
+     * here.
+     * 
+     * @return the time when the credential expires, in terms of the number of
+     *         milliseconds since the epoch
+     */
+    long expireTimeMs();
+
+    /**
+     * The point after which the credential can no longer be refreshed, in terms of
+     * the number of milliseconds since the epoch, if any, otherwise null. Some
+     * expiring credentials can be refreshed over and over again without limit, so
+     * we support a null return value here.
+     * 
+     * @return the point after which the credential can no longer be refreshed, in
+     *         terms of the number of milliseconds since the epoch, if any,
+     *         otherwise null
+     */
+    Long absoluteLastRefreshTimeMs();
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshConfig.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshConfig.java
new file mode 100644
index 0000000..02c264b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshConfig.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.expiring;
+
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.kafka.common.config.SaslConfigs;
+
+/**
+ * Immutable refresh-related configuration for expiring credentials that can be
+ * parsed from a producer/consumer/broker config.
+ */
+public class ExpiringCredentialRefreshConfig {
+    private final double loginRefreshWindowFactor;
+    private final double loginRefreshWindowJitter;
+    private final short loginRefreshMinPeriodSeconds;
+    private final short loginRefreshBufferSeconds;
+    private final boolean loginRefreshReloginAllowedBeforeLogout;
+
+    /**
+     * Constructor based on producer/consumer/broker configs and the indicated value
+     * for whether or not client relogin is allowed before logout
+     * 
+     * @param configs
+     *            the mandatory (but possibly empty) producer/consumer/broker
+     *            configs upon which to build this instance
+     * @param clientReloginAllowedBeforeLogout
+     *            if the {@code LoginModule} and {@code SaslClient} implementations
+     *            support multiple simultaneous login contexts on a single
+     *            {@code Subject} at the same time. If true, then upon refresh,
+     *            logout will only be invoked on the original {@code LoginContext}
+     *            after a new one successfully logs in. This can be helpful if the
+     *            original credential still has some lifetime left when an attempt
+     *            to refresh the credential fails; the client will still be able to
+     *            create new connections as long as the original credential remains
+     *            valid. Otherwise, if logout is immediately invoked prior to
+     *            relogin, a relogin failure leaves the client without the ability
+     *            to connect until relogin does in fact succeed.
+     */
+    public ExpiringCredentialRefreshConfig(Map<String, ?> configs, boolean clientReloginAllowedBeforeLogout) {
+        Objects.requireNonNull(configs);
+        this.loginRefreshWindowFactor = (Double) configs.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR);
+        this.loginRefreshWindowJitter = (Double) configs.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER);
+        this.loginRefreshMinPeriodSeconds = (Short) configs.get(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS);
+        this.loginRefreshBufferSeconds = (Short) configs.get(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS);
+        this.loginRefreshReloginAllowedBeforeLogout = clientReloginAllowedBeforeLogout;
+    }
+
+    /**
+     * Background login refresh thread will sleep until the specified window factor
+     * relative to the credential's total lifetime has been reached, at which time
+     * it will try to refresh the credential.
+     * 
+     * @return the login refresh window factor
+     */
+    public double loginRefreshWindowFactor() {
+        return loginRefreshWindowFactor;
+    }
+
+    /**
+     * Amount of random jitter added to the background login refresh thread's sleep
+     * time.
+     * 
+     * @return the login refresh window jitter
+     */
+    public double loginRefreshWindowJitter() {
+        return loginRefreshWindowJitter;
+    }
+
+    /**
+     * The desired minimum time between checks by the background login refresh
+     * thread, in seconds
+     * 
+     * @return the desired minimum refresh period, in seconds
+     */
+    public short loginRefreshMinPeriodSeconds() {
+        return loginRefreshMinPeriodSeconds;
+    }
+
+    /**
+     * The amount of buffer time before expiration to maintain when refreshing. If a
+     * refresh is scheduled to occur closer to expiration than the number of seconds
+     * defined here then the refresh will be moved up to maintain as much of the
+     * desired buffer as possible.
+     * 
+     * @return the refresh buffer, in seconds
+     */
+    public short loginRefreshBufferSeconds() {
+        return loginRefreshBufferSeconds;
+    }
+
+    /**
+     * If the LoginModule and SaslClient implementations support multiple
+     * simultaneous login contexts on a single Subject at the same time. If true,
+     * then upon refresh, logout will only be invoked on the original LoginContext
+     * after a new one successfully logs in. This can be helpful if the original
+     * credential still has some lifetime left when an attempt to refresh the
+     * credential fails; the client will still be able to create new connections as
+     * long as the original credential remains valid. Otherwise, if logout is
+     * immediately invoked prior to relogin, a relogin failure leaves the client
+     * without the ability to connect until relogin does in fact succeed.
+     * 
+     * @return true if relogin is allowed prior to discarding an existing
+     *         (presumably unexpired) credential, otherwise false
+     */
+    public boolean loginRefreshReloginAllowedBeforeLogout() {
+        return loginRefreshReloginAllowedBeforeLogout;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshingLogin.java
new file mode 100644
index 0000000..5bd889e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshingLogin.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.expiring;
+
+import java.util.Date;
+import java.util.Objects;
+import java.util.Random;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.Login;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for refreshing logins for both Kafka client and
+ * server when the login is a type that has a limited lifetime/will expire. The
+ * credentials for the login must implement {@link ExpiringCredential}.
+ */
+public abstract class ExpiringCredentialRefreshingLogin {
+    /**
+     * Class that can be overridden for testing
+     */
+    static class LoginContextFactory {
+        public LoginContext createLoginContext(ExpiringCredentialRefreshingLogin expiringCredentialRefreshingLogin)
+                throws LoginException {
+            return new LoginContext(expiringCredentialRefreshingLogin.contextName(),
+                    expiringCredentialRefreshingLogin.subject(), expiringCredentialRefreshingLogin.callbackHandler(),
+                    expiringCredentialRefreshingLogin.configuration());
+        }
+
+        public void refresherThreadStarted() {
+            // empty
+        }
+
+        public void refresherThreadDone() {
+            // empty
+        }
+    }
+
+    private static class ExitRefresherThreadDueToIllegalStateException extends Exception {
+        private static final long serialVersionUID = -6108495378411920380L;
+
+        public ExitRefresherThreadDueToIllegalStateException(String message) {
+            super(message);
+        }
+    }
+
+    private class Refresher implements Runnable {
+        @Override
+        public void run() {
+            log.info("[Principal={}]: Expiring credential re-login thread started.", principalLogText());
+            while (true) {
+                /*
+                 * Refresh thread's main loop. Each expiring credential lives for one iteration
+                 * of the loop. Thread will exit if the loop exits from here.
+                 */
+                long nowMs = currentMs();
+                Long nextRefreshMs = refreshMs(nowMs);
+                if (nextRefreshMs == null) {
+                    loginContextFactory.refresherThreadDone();
+                    return;
+                }
+                log.info("[Principal={}]: Expiring credential re-login sleeping until: {}", principalLogText(),
+                        new Date(nextRefreshMs));
+                time.sleep(nextRefreshMs - nowMs);
+                if (Thread.currentThread().isInterrupted()) {
+                    log.info("[Principal={}]: Expiring credential re-login thread has been interrupted and will exit.",
+                            principalLogText());
+                    loginContextFactory.refresherThreadDone();
+                    return;
+                }
+                while (true) {
+                    /*
+                     * Perform a re-login over and over again with some intervening delay
+                     * unless/until either the refresh succeeds or we are interrupted.
+                     */
+                    try {
+                        reLogin();
+                        break; // success
+                    } catch (ExitRefresherThreadDueToIllegalStateException e) {
+                        log.error(e.getMessage(), e);
+                        loginContextFactory.refresherThreadDone();
+                        return;
+                    } catch (LoginException loginException) {
+                        log.warn(String.format(
+                                "[Principal=%s]: LoginException during login retry; will sleep %d seconds before trying again.",
+                                principalLogText(), DELAY_SECONDS_BEFORE_NEXT_RETRY_WHEN_RELOGIN_FAILS),
+                                loginException);
+                        // Sleep and allow loop to run/try again unless interrupted
+                        time.sleep(DELAY_SECONDS_BEFORE_NEXT_RETRY_WHEN_RELOGIN_FAILS * 1000);
+                        if (Thread.currentThread().isInterrupted()) {
+                            log.error(
+                                    "[Principal={}]: Interrupted while trying to perform a subsequent expiring credential re-login after one or more initial re-login failures: re-login thread exiting now: {}",
+                                    principalLogText(), String.valueOf(loginException.getMessage()));
+                            loginContextFactory.refresherThreadDone();
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ExpiringCredentialRefreshingLogin.class);
+    private static final long DELAY_SECONDS_BEFORE_NEXT_RETRY_WHEN_RELOGIN_FAILS = 10L;
+    private static final Random RNG = new Random();
+    private final Time time;
+    private Thread refresherThread;
+
+    private final LoginContextFactory loginContextFactory;
+    private final String contextName;
+    private final Configuration configuration;
+    private final ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig;
+    private final AuthenticateCallbackHandler callbackHandler;
+
+    // mark volatile due to existence of public subject() method
+    private volatile Subject subject = null;
+    private boolean hasExpiringCredential = false;
+    private String principalName = null;
+    private LoginContext loginContext = null;
+    private ExpiringCredential expiringCredential = null;
+    private final Class<?> mandatoryClassToSynchronizeOnPriorToRefresh;
+
+    public ExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
+            ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
+            AuthenticateCallbackHandler callbackHandler, Class<?> mandatoryClassToSynchronizeOnPriorToRefresh) {
+        this(contextName, configuration, expiringCredentialRefreshConfig, callbackHandler,
+                mandatoryClassToSynchronizeOnPriorToRefresh, new LoginContextFactory(), Time.SYSTEM);
+    }
+
+    public ExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
+            ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
+            AuthenticateCallbackHandler callbackHandler, Class<?> mandatoryClassToSynchronizeOnPriorToRefresh,
+            LoginContextFactory loginContextFactory, Time time) {
+        this.contextName = Objects.requireNonNull(contextName);
+        this.configuration = Objects.requireNonNull(configuration);
+        this.expiringCredentialRefreshConfig = Objects.requireNonNull(expiringCredentialRefreshConfig);
+        this.callbackHandler = callbackHandler;
+        this.mandatoryClassToSynchronizeOnPriorToRefresh = Objects
+                .requireNonNull(mandatoryClassToSynchronizeOnPriorToRefresh);
+        this.loginContextFactory = loginContextFactory;
+        this.time = Objects.requireNonNull(time);
+    }
+
+    public Subject subject() {
+        return subject; // field requires volatile keyword
+    }
+
+    public String contextName() {
+        return contextName;
+    }
+
+    public Configuration configuration() {
+        return configuration;
+    }
+
+    public AuthenticateCallbackHandler callbackHandler() {
+        return callbackHandler;
+    }
+
+    public String serviceName() {
+        return "kafka";
+    }
+
+    /**
+     * Performs login for each login module specified for the login context of this
+     * instance and starts the thread used to periodically re-login.
+     * <p>
+     * The synchronized keyword is not necessary because an implementation of
+     * {@link Login} will delegate to this code (e.g. OAuthBearerRefreshingLogin},
+     * and the {@code login()} method on the delegating class will itself be
+     * synchronized if necessary.
+     */
+    public LoginContext login() throws LoginException {
+        LoginContext tmpLoginContext = loginContextFactory.createLoginContext(this);
+        tmpLoginContext.login();
+        log.info("Successfully logged in.");
+        loginContext = tmpLoginContext;
+        subject = loginContext.getSubject();
+        expiringCredential = expiringCredential();
+        hasExpiringCredential = expiringCredential != null;
+        if (!hasExpiringCredential) {
+            // do not bother with re-logins.
+            log.debug("No Expiring Credential");
+            principalName = null;
+            refresherThread = null;
+            return loginContext;
+        }
+
+        principalName = expiringCredential.principalName();
+
+        // Check for a clock skew problem
+        long expireTimeMs = expiringCredential.expireTimeMs();
+        long nowMs = currentMs();
+        if (nowMs > expireTimeMs) {
+            log.error(
+                    "[Principal={}]: Current clock: {} is later than expiry {}. This may indicate a clock skew problem."
+                            + " Check that this host's and remote host's clocks are in sync. Not starting refresh thread."
+                            + " This process is likely unable to authenticate SASL connections (for example, it is unlikely"
+                            + " to be able to authenticate a connection with a Kafka Broker).",
+                    principalLogText(), new Date(nowMs), new Date(expireTimeMs));
+            return loginContext;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("[Principal={}]: It is an expiring credential", principalLogText());
+
+        /*
+         * Re-login periodically. How often is determined by the expiration date of the
+         * credential and refresh-related configuration values.
+         */
+        refresherThread = KafkaThread.daemon(String.format("kafka-expiring-relogin-thread-%s", principalName),
+                new Refresher());
+        refresherThread.start();
+        loginContextFactory.refresherThreadStarted();
+        return loginContext;
+    }
+
+    public void close() {
+        if (refresherThread != null && refresherThread.isAlive()) {
+            refresherThread.interrupt();
+            try {
+                refresherThread.join();
+            } catch (InterruptedException e) {
+                log.warn("[Principal={}]: Interrupted while waiting for re-login thread to shutdown.",
+                        principalLogText(), e);
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public abstract ExpiringCredential expiringCredential();
+
+    /**
+     * Determine when to sleep until before performing a refresh
+     * 
+     * @param relativeToMs
+     *            the point (in terms of number of milliseconds since the epoch) at
+     *            which to perform the calculation
+     * @return null if no refresh should occur, otherwise the time to sleep until
+     *         (in terms of the number of milliseconds since the epoch) before
+     *         performing a refresh
+     */
+    private Long refreshMs(long relativeToMs) {
+        if (expiringCredential == null) {
+            /*
+             * Re-login failed because our login() invocation did not generate a credential
+             * but also did not generate an exception. Try logging in again after some delay
+             * (it seems likely to be a bug, but it doesn't hurt to keep trying to refresh).
+             */
+            long retvalNextRefreshMs = relativeToMs + DELAY_SECONDS_BEFORE_NEXT_RETRY_WHEN_RELOGIN_FAILS * 1000L;
+            log.warn("[Principal={}]: No Expiring credential found: will try again at {}", principalLogText(),
+                    new Date(retvalNextRefreshMs));
+            return retvalNextRefreshMs;
+        }
+        long expireTimeMs = expiringCredential.expireTimeMs();
+        if (relativeToMs > expireTimeMs) {
+            boolean logoutRequiredBeforeLoggingBackIn = isLogoutRequiredBeforeLoggingBackIn();
+            if (logoutRequiredBeforeLoggingBackIn) {
+                log.error(
+                        "[Principal={}]: Current clock: {} is later than expiry {}. This may indicate a clock skew problem."
+                                + " Check that this host's and remote host's clocks are in sync. Exiting refresh thread.",
+                        principalLogText(), new Date(relativeToMs), new Date(expireTimeMs));
+                return null;
+            } else {
+                /*
+                 * Since the current soon-to-expire credential isn't logged out until we have a
+                 * new credential with a refreshed lifetime, it is possible that the current
+                 * credential could expire if the re-login continually fails over and over again
+                 * making us unable to get the new credential. Therefore keep trying rather than
+                 * exiting.
+                 */
+                long retvalNextRefreshMs = relativeToMs + DELAY_SECONDS_BEFORE_NEXT_RETRY_WHEN_RELOGIN_FAILS * 1000L;
+                log.warn("[Principal={}]: Expiring credential already expired at {}: will try to refresh again at {}",
+                        principalLogText(), new Date(expireTimeMs), new Date(retvalNextRefreshMs));
+                return retvalNextRefreshMs;
+            }
+        }
+        Long absoluteLastRefreshTimeMs = expiringCredential.absoluteLastRefreshTimeMs();
+        if (absoluteLastRefreshTimeMs != null && absoluteLastRefreshTimeMs.longValue() < expireTimeMs) {
+            log.warn("[Principal={}]: Expiring credential refresh thread exiting because the"
+                    + " expiring credential's current expiration time ({}) exceeds the latest possible refresh time ({})."
+                    + " This process will not be able to authenticate new SASL connections after that"
+                    + " time (for example, it will not be able to authenticate a new connection with a Kafka Broker).",
+                    principalLogText(), new Date(expireTimeMs), new Date(absoluteLastRefreshTimeMs.longValue()));
+            return null;
+        }
+        Long optionalStartTime = expiringCredential.startTimeMs();
+        long startMs = optionalStartTime != null ? optionalStartTime.longValue() : currentMs();
+        log.info("[Principal={}]: Expiring credential valid from {} to {}", expiringCredential.principalName(),
+                new java.util.Date(startMs), new java.util.Date(expireTimeMs));
+
+        double pct = expiringCredentialRefreshConfig.loginRefreshWindowFactor()
+                + (expiringCredentialRefreshConfig.loginRefreshWindowJitter() * RNG.nextDouble());
+        /*
+         * Ignore buffer times if the credential's remaining lifetime is less than their
+         * sum.
+         */
+        long refreshMinPeriodSeconds = expiringCredentialRefreshConfig.loginRefreshMinPeriodSeconds();
+        long clientRefreshBufferSeconds = expiringCredentialRefreshConfig.loginRefreshBufferSeconds();
+        if (relativeToMs + 1000L * (refreshMinPeriodSeconds + clientRefreshBufferSeconds) > expireTimeMs) {
+            long retvalRefreshMs = startMs + (long) ((expireTimeMs - startMs) * pct);
+            log.warn(
+                    "[Principal={}]: Expiring credential expires at {}, so buffer times of {} and {} seconds"
+                            + " at the front and back, respectively, cannot be accommodated.  We will refresh at {}.",
+                    principalLogText(), new Date(expireTimeMs), refreshMinPeriodSeconds, clientRefreshBufferSeconds,
+                    new Date(retvalRefreshMs));
+            return retvalRefreshMs;
+        }
+        long proposedRefreshMs = startMs + (long) ((expireTimeMs - startMs) * pct);
+        // Don't let it violate the requested end buffer time
+        long beginningOfEndBufferTimeMs = expireTimeMs - clientRefreshBufferSeconds * 1000;
+        if (proposedRefreshMs > beginningOfEndBufferTimeMs) {
+            log.info(
+                    "[Principal={}]: Proposed refresh time of {} extends into the desired buffer time of {} seconds before expiration, so refresh it at the desired buffer begin point, at {}",
+                    expiringCredential.principalName(), new Date(proposedRefreshMs), clientRefreshBufferSeconds,
+                    new Date(beginningOfEndBufferTimeMs));
+            return beginningOfEndBufferTimeMs;
+        }
+        // Don't let it violate the minimum refresh period
+        long endOfMinRefreshBufferTime = relativeToMs + 1000 * refreshMinPeriodSeconds;
+        if (proposedRefreshMs < endOfMinRefreshBufferTime) {
+            log.info(
+                    "[Principal={}]: Expiring credential re-login thread time adjusted from {} to {} since the former is sooner "
+                            + "than the minimum refresh interval ({} seconds from now).",
+                    principalLogText(), new Date(proposedRefreshMs), new Date(endOfMinRefreshBufferTime),
+                    refreshMinPeriodSeconds);
+            return endOfMinRefreshBufferTime;
+        }
+        // Proposed refresh time doesn't violate any constraints
+        return proposedRefreshMs;
+    }
+
+    private void reLogin() throws LoginException, ExitRefresherThreadDueToIllegalStateException {
+        synchronized (mandatoryClassToSynchronizeOnPriorToRefresh) {
+            // Only perform one refresh of a particular type at a time
+            boolean logoutRequiredBeforeLoggingBackIn = isLogoutRequiredBeforeLoggingBackIn();
+            if (hasExpiringCredential && logoutRequiredBeforeLoggingBackIn) {
+                String principalLogTextPriorToLogout = principalLogText();
+                log.info("Initiating logout for {}", principalLogTextPriorToLogout);
+                loginContext.logout();
+                // Make absolutely sure we were logged out
+                expiringCredential = expiringCredential();
+                hasExpiringCredential = expiringCredential != null;
+                if (hasExpiringCredential)
+                    // We can't force the removal because we don't know how to do it, so abort
+                    throw new ExitRefresherThreadDueToIllegalStateException(String.format(
+                            "Subject's private credentials still contains an instance of %s even though logout() was invoked; exiting refresh thread",
+                            expiringCredential.getClass().getName()));
+            }
+            /*
+             * Perform a login, making note of any credential that might need a logout()
+             * afterwards
+             */
+            ExpiringCredential optionalCredentialToLogout = expiringCredential;
+            LoginContext optionalLoginContextToLogout = loginContext;
+            loginContext = loginContextFactory.createLoginContext(ExpiringCredentialRefreshingLogin.this);
+            log.info("Initiating re-login for {}, logout() still needs to be called on a previous login = {}",
+                    principalName, optionalCredentialToLogout != null);
+            loginContext.login();
+            // Perform a logout() on any original credential if necessary
+            if (optionalCredentialToLogout != null)
+                optionalLoginContextToLogout.logout();
+            /*
+             * Get the new credential and make sure it is not any old one that required a
+             * logout() after the login()
+             */
+            expiringCredential = expiringCredential();
+            hasExpiringCredential = expiringCredential != null;
+            if (!hasExpiringCredential) {
+                /*
+                 * Re-login has failed because our login() invocation has not generated a
+                 * credential but has also not generated an exception. We won't exit here;
+                 * instead we will allow login retries in case we can somehow fix the issue (it
+                 * seems likely to be a bug, but it doesn't hurt to keep trying to refresh).
+                 */
+                log.error("No Expiring Credential after a supposedly-successful re-login");
+                principalName = null;
+            } else {
+                if (expiringCredential == optionalCredentialToLogout)
+                    /*
+                     * The login() didn't identify a new credential; we still have the old one. We
+                     * don't know how to fix this, so abort.
+                     */
+                    throw new ExitRefresherThreadDueToIllegalStateException(String.format(
+                            "Subject's private credentials still contains the previous, soon-to-expire instance of %s even though login() followed by logout() was invoked; exiting refresh thread",
+                            expiringCredential.getClass().getName()));
+                principalName = expiringCredential.principalName();
+                if (log.isDebugEnabled())
+                    log.debug("[Principal={}]: It is an expiring credential after re-login as expected",
+                            principalLogText());
+            }
+        }
+    }
+
+    private String principalLogText() {
+        return expiringCredential == null ? principalName
+                : expiringCredential.getClass().getSimpleName() + ":" + principalName;
+    }
+
+    private long currentMs() {
+        return time.milliseconds();
+    }
+
+    private boolean isLogoutRequiredBeforeLoggingBackIn() {
+        return !expiringCredentialRefreshConfig.loginRefreshReloginAllowedBeforeLogout();
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerConfigException.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerConfigException.java
new file mode 100644
index 0000000..8fde861
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerConfigException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Exception thrown when there is a problem with the configuration (an invalid
+ * option in a JAAS config, for example).
+ */
+public class OAuthBearerConfigException extends KafkaException {
+    private static final long serialVersionUID = -8056105648062343518L;
+
+    public OAuthBearerConfigException(String s) {
+        super(s);
+    }
+
+    public OAuthBearerConfigException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerIllegalTokenException.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerIllegalTokenException.java
new file mode 100644
index 0000000..4a09bc9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerIllegalTokenException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import java.util.Objects;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Exception thrown when token validation fails due to a problem with the token
+ * itself (as opposed to a missing remote resource or a configuration problem)
+ */
+public class OAuthBearerIllegalTokenException extends KafkaException {
+    private static final long serialVersionUID = -5275276640051316350L;
+    private final OAuthBearerValidationResult reason;
+
+    /**
+     * Constructor
+     * 
+     * @param reason
+     *            the mandatory reason for the validation failure; it must indicate
+     *            failure
+     */
+    public OAuthBearerIllegalTokenException(OAuthBearerValidationResult reason) {
+        super(Objects.requireNonNull(reason).failureDescription());
+        if (reason.success())
+            throw new IllegalArgumentException("The reason indicates success; it must instead indicate failure");
+        this.reason = reason;
+    }
+
+    /**
+     * Return the (always non-null) reason for the validation failure
+     * 
+     * @return the reason for the validation failure
+     */
+    public OAuthBearerValidationResult reason() {
+        return reason;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerScopeUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerScopeUtils.java
new file mode 100644
index 0000000..40a5f0b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerScopeUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+/**
+ * Utility class for help dealing with
+ * <a href="https://tools.ietf.org/html/rfc6749#section-3.3">Access Token
+ * Scopes</a>
+ */
+public class OAuthBearerScopeUtils {
+    private static final Pattern INDIVIDUAL_SCOPE_ITEM_PATTERN = Pattern.compile("[\\x23-\\x5B\\x5D-\\x7E\\x21]+");
+
+    /**
+     * Return true if the given value meets the definition of a valid scope item as
+     * per <a href="https://tools.ietf.org/html/rfc6749#section-3.3">RFC 6749
+     * Section 3.3</a>, otherwise false
+     * 
+     * @param scopeItem
+     *            the mandatory scope item to check for validity
+     * @return true if the given value meets the definition of a valid scope item,
+     *         otherwise false
+     */
+    public static boolean isValidScopeItem(String scopeItem) {
+        return INDIVIDUAL_SCOPE_ITEM_PATTERN.matcher(Objects.requireNonNull(scopeItem)).matches();
+    }
+
+    /**
+     * Convert a space-delimited list of scope values (for example,
+     * <code>"scope1 scope2"</code>) to a List containing the individual elements
+     * (<code>"scope1"</code> and <code>"scope2"</code>)
+     * 
+     * @param spaceDelimitedScope
+     *            the mandatory (but possibly empty) space-delimited scope values,
+     *            each of which must be valid according to
+     *            {@link #isValidScopeItem(String)}
+     * @return the list of the given (possibly empty) space-delimited values
+     * @throws OAuthBearerConfigException
+     *             if any of the individual scope values are malformed/illegal
+     */
+    public static List<String> parseScope(String spaceDelimitedScope) throws OAuthBearerConfigException {
+        List<String> retval = new ArrayList<>();
+        for (String individualScopeItem : Objects.requireNonNull(spaceDelimitedScope).split(" ")) {
+            if (!individualScopeItem.isEmpty()) {
+                if (!isValidScopeItem(individualScopeItem))
+                    throw new OAuthBearerConfigException(String.format("Invalid scope value: %s", individualScopeItem));
+                retval.add(individualScopeItem);
+            }
+        }
+        return Collections.unmodifiableList(retval);
+    }
+
+    private OAuthBearerScopeUtils() {
+        // empty
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredJws.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredJws.java
new file mode 100644
index 0000000..aedde8a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredJws.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.NumericNode;
+
+/**
+ * A simple unsecured JWS implementation. The '{@code nbf}' claim is ignored if
+ * it is given because the related logic is not required for Kafka testing and
+ * development purposes.
+ * 
+ * @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>
+ */
+public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
+    private final String compactSerialization;
+    private final List<String> splits;
+    private final Map<String, Object> header;
+    private final String principalClaimName;
+    private final String scopeClaimName;
+    private final Map<String, Object> claims;
+    private final Set<String> scope;
+    private final long lifetime;
+    private final String principalName;
+    private final Long startTimeMs;
+
+    /**
+     * Constructor with the given principal and scope claim names
+     * 
+     * @param compactSerialization
+     *            the compact serialization to parse as an unsecured JWS
+     * @param principalClaimName
+     *            the required principal claim name
+     * @param scopeClaimName
+     *            the required scope claim name
+     * @throws OAuthBearerIllegalTokenException
+     *             if the compact serialization is not a valid unsecured JWS
+     *             (meaning it did not have 3 dot-separated Base64URL sections
+     *             without an empty digital signature; or the header or claims
+     *             either are not valid Base 64 URL encoded values or are not JSON
+     *             after decoding; or the mandatory '{@code alg}' header value is
+     *             not "{@code none}")
+     */
+    public OAuthBearerUnsecuredJws(String compactSerialization, String principalClaimName, String scopeClaimName)
+            throws OAuthBearerIllegalTokenException {
+        this.compactSerialization = Objects.requireNonNull(compactSerialization);
+        if (compactSerialization.contains(".."))
+            throw new OAuthBearerIllegalTokenException(
+                    OAuthBearerValidationResult.newFailure("Malformed compact serialization contains '..'"));
+        this.splits = extractCompactSerializationSplits();
+        this.header = toMap(splits().get(0));
+        String claimsSplit = splits.get(1);
+        this.claims = toMap(claimsSplit);
+        String alg = Objects.requireNonNull(header().get("alg"), "JWS header must have an Algorithm value").toString();
+        if (!"none".equals(alg))
+            throw new OAuthBearerIllegalTokenException(
+                    OAuthBearerValidationResult.newFailure("Unsecured JWS must have 'none' for an algorithm"));
+        String digitalSignatureSplit = splits.get(2);
+        if (!digitalSignatureSplit.isEmpty())
+            throw new OAuthBearerIllegalTokenException(
+                    OAuthBearerValidationResult.newFailure("Unsecured JWS must not contain a digital signature"));
+        this.principalClaimName = Objects.requireNonNull(principalClaimName).trim();
+        if (this.principalClaimName.isEmpty())
+            throw new IllegalArgumentException("Must specify a non-blank principal claim name");
+        this.scopeClaimName = Objects.requireNonNull(scopeClaimName).trim();
+        if (this.scopeClaimName.isEmpty())
+            throw new IllegalArgumentException("Must specify a non-blank scope claim name");
+        this.scope = calculateScope();
+        Number expirationTimeSeconds = expirationTime();
+        if (expirationTimeSeconds == null)
+            throw new OAuthBearerIllegalTokenException(
+                    OAuthBearerValidationResult.newFailure("No expiration time in JWT"));
+        lifetime = convertClaimTimeInSecondsToMs(expirationTimeSeconds);
+        String principalName = claim(this.principalClaimName, String.class);
+        if (principalName == null || principalName.trim().isEmpty())
+            throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult
+                    .newFailure("No principal name in JWT claim: " + this.principalClaimName));
+        this.principalName = principalName;
+        this.startTimeMs = calculateStartTimeMs();
+    }
+
+    @Override
+    public String value() {
+        return compactSerialization;
+    }
+
+    /**
+     * Return the 3 or 5 dot-separated sections of the JWT compact serialization
+     * 
+     * @return the 3 or 5 dot-separated sections of the JWT compact serialization
+     */
+    public List<String> splits() {
+        return splits;
+    }
+
+    /**
+     * Return the JOSE Header as a {@code Map}
+     * 
+     * @return the JOSE header
+     */
+    public Map<String, Object> header() {
+        return header;
+    }
+
+    @Override
+    public String principalName() {
+        return principalName;
+    }
+
+    @Override
+    public Long startTimeMs() {
+        return startTimeMs;
+    }
+
+    @Override
+    public long lifetimeMs() {
+        return lifetime;
+    }
+
+    @Override
+    public Set<String> scope() throws OAuthBearerIllegalTokenException {
+        return scope;
+    }
+
+    /**
+     * Return the JWT Claim Set as a {@code Map}
+     * 
+     * @return the (always non-null but possibly empty) claims
+     */
+    public Map<String, Object> claims() {
+        return claims;
+    }
+
+    /**
+     * Return the (always non-null/non-empty) principal claim name
+     * 
+     * @return the (always non-null/non-empty) principal claim name
+     */
+    public String principalClaimName() {
+        return principalClaimName;
+    }
+
+    /**
+     * Return the (always non-null/non-empty) scope claim name
+     * 
+     * @return the (always non-null/non-empty) scope claim name
+     */
+    public String scopeClaimName() {
+        return scopeClaimName;
+    }
+
+    /**
+     * Indicate if the claim exists and is the given type
+     * 
+     * @param claimName
+     *            the mandatory JWT claim name
+     * @param type
+     *            the mandatory type, which should either be String.class,
+     *            Number.class, or List.class
+     * @return true if the claim exists and is the given type, otherwise false
+     */
+    public boolean isClaimType(String claimName, Class<?> type) {
+        Object value = rawClaim(claimName);
+        Objects.requireNonNull(type);
+        if (value == null)
+            return false;
+        if (type == String.class && value instanceof String)
+            return true;
+        if (type == Number.class && value instanceof Number)
+            return true;
+        return type == List.class && value instanceof List;
+    }
+
+    /**
+     * Extract a claim of the given type
+     * 
+     * @param claimName
+     *            the mandatory JWT claim name
+     * @param type
+     *            the mandatory type, which must either be String.class,
+     *            Number.class, or List.class
+     * @return the claim if it exists, otherwise null
+     * @throws OAuthBearerIllegalTokenException
+     *             if the claim exists but is not the given type
+     */
+    public <T> T claim(String claimName, Class<T> type) throws OAuthBearerIllegalTokenException {
+        Object value = rawClaim(claimName);
+        try {
+            return Objects.requireNonNull(type).cast(value);
+        } catch (ClassCastException e) {
+            throw new OAuthBearerIllegalTokenException(
+                    OAuthBearerValidationResult.newFailure(String.format("The '%s' claim was not of type %s: %s",
+                            claimName, type.getSimpleName(), value.getClass().getSimpleName())));
+        }
+    }
+
+    /**
+     * Extract a claim in its raw form
+     * 
+     * @param claimName
+     *            the mandatory JWT claim name
+     * @return the raw claim value, if it exists, otherwise null
+     */
+    public Object rawClaim(String claimName) {
+        return claims().get(Objects.requireNonNull(claimName));
+    }
+
+    /**
+     * Return the
+     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
+     * Time</a> claim
+     * 
+     * @return the <a href=
+     *         "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
+     *         Time</a> claim if available, otherwise null
+     * @throws OAuthBearerIllegalTokenException
+     *             if the claim value is the incorrect type
+     */
+    public Number expirationTime() throws OAuthBearerIllegalTokenException {
+        return claim("exp", Number.class);
+    }
+
+    /**
+     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
+     * At</a> claim
+     * 
+     * @return the
+     *         <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
+     *         At</a> claim if available, otherwise null
+     * @throws OAuthBearerIllegalTokenException
+     *             if the claim value is the incorrect type
+     */
+    public Number issuedAt() throws OAuthBearerIllegalTokenException {
+        return claim("iat", Number.class);
+    }
+
+    /**
+     * Return the
+     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
+     * 
+     * @return the <a href=
+     *         "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
+     *         if available, otherwise null
+     * @throws OAuthBearerIllegalTokenException
+     *             if the claim value is the incorrect type
+     */
+    public String subject() throws OAuthBearerIllegalTokenException {
+        return claim("sub", String.class);
+    }
+
+    /**
+     * Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
+     * object, and return the map of member names to their values (each value being
+     * represented as either a String, a Number, or a List of Strings).
+     * 
+     * @param split
+     *            the value to decode and parse
+     * @return the map of JSON member names to their String, Number, or String List
+     *         value
+     * @throws OAuthBearerIllegalTokenException
+     *             if the given Base64URL-encoded value cannot be decoded or parsed
+     */
+    public static Map<String, Object> toMap(String split) throws OAuthBearerIllegalTokenException {
+        Map<String, Object> retval = new HashMap<>();
+        try {
+            byte[] decode = Base64.getDecoder().decode(split);
+            JsonNode jsonNode = new ObjectMapper().readTree(decode);
+            if (jsonNode == null)
+                throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult.newFailure("malformed JSON"));
+            for (Iterator<Entry<String, JsonNode>> iterator = jsonNode.fields(); iterator.hasNext();) {
+                Entry<String, JsonNode> entry = iterator.next();
+                retval.put(entry.getKey(), convert(entry.getValue()));
+            }
+            return Collections.unmodifiableMap(retval);
+        } catch (IllegalArgumentException e) {
+            // potentially thrown by java.util.Base64.Decoder implementations
+            throw new OAuthBearerIllegalTokenException(
+                    OAuthBearerValidationResult.newFailure("malformed Base64 URL encoded value"));
+        } catch (IOException e) {
+            throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult.newFailure("malformed JSON"));
+        }
+    }
+
+    private List<String> extractCompactSerializationSplits() {
+        List<String> tmpSplits = new ArrayList<>(Arrays.asList(compactSerialization.split("\\.")));
+        if (compactSerialization.endsWith("."))
+            tmpSplits.add("");
+        if (tmpSplits.size() != 3)
+            throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult.newFailure(
+                    "Unsecured JWS compact serializations must have 3 dot-separated Base64URL-encoded values"));
+        return Collections.unmodifiableList(tmpSplits);
+    }
+
+    private static Object convert(JsonNode value) {
+        if (value.isArray()) {
+            List<String> retvalList = new ArrayList<>();
+            for (JsonNode arrayElement : value) {
+                retvalList.add(arrayElement.asText());
+            }
+            return retvalList;
+        }
+        return value.getNodeType() == JsonNodeType.NUMBER ? ((NumericNode) value).numberValue() : value.asText();
+    }
+
+    private Long calculateStartTimeMs() throws OAuthBearerIllegalTokenException {
+        Number issuedAtSeconds = claim("iat", Number.class);
+        return issuedAtSeconds == null ? null : convertClaimTimeInSecondsToMs(issuedAtSeconds);
+    }
+
+    private static long convertClaimTimeInSecondsToMs(Number claimValue) {
+        return Math.round(claimValue.doubleValue() * 1000);
+    }
+
+    private Set<String> calculateScope() {
+        String scopeClaimName = scopeClaimName();
+        if (isClaimType(scopeClaimName, String.class)) {
+            String scopeClaimValue = claim(scopeClaimName, String.class);
+            if (scopeClaimValue.trim().isEmpty())
+                return Collections.emptySet();
+            else {
+                Set<String> retval = new HashSet<>();
+                retval.add(scopeClaimValue.trim());
+                return Collections.unmodifiableSet(retval);
+            }
+        }
+        List<?> scopeClaimValue = claim(scopeClaimName, List.class);
+        if (scopeClaimValue == null || scopeClaimValue.isEmpty())
+            return Collections.emptySet();
+        @SuppressWarnings("unchecked")
+        List<String> stringList = (List<String>) scopeClaimValue;
+        Set<String> retval = new HashSet<>();
+        for (String scope : stringList) {
+            if (scope != null && !scope.trim().isEmpty()) {
+                retval.add(scope.trim());
+            }
+        }
+        return Collections.unmodifiableSet(retval);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
new file mode 100644
index 0000000..183f587
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code CallbackHandler} that recognizes {@link OAuthBearerTokenCallback}
+ * and returns an unsecured OAuth 2 bearer token.
+ * <p>
+ * Claims and their values on the returned token can be specified using
+ * {@code unsecuredLoginStringClaim_<claimname>},
+ * {@code unsecuredLoginNumberClaim_<claimname>}, and
+ * {@code unsecuredLoginListClaim_<claimname>} options. The first character of
+ * the value is taken as the delimiter for list claims. You may define any claim
+ * name and value except '{@code iat}' and '{@code exp}', both of which are
+ * calculated automatically.
+ * <p>
+ * This implementation also accepts the following options:
+ * <ul>
+ * <li>{@code unsecuredLoginPrincipalClaimName} set to a custom claim name if
+ * you wish the name of the String claim holding the principal name to be
+ * something other than '{@code sub}'.</li>
+ * <li>{@code unsecuredLoginLifetimeSeconds} set to an integer value if the
+ * token expiration is to be set to something other than the default value of
+ * 3600 seconds (which is 1 hour). The '{@code exp}' claim reflects the
+ * expiration time.</li>
+ * <li>{@code unsecuredLoginScopeClaimName} set to a custom claim name if you
+ * wish the name of the String or String List claim holding any token scope to
+ * be something other than '{@code scope}'</li>
+ * </ul>
+ * For example:
+ * 
+ * <pre>
+ * KafkaClient {
+ *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
+ *      unsecuredLoginStringClaim_sub="thePrincipalName"
+ *      unsecuredLoginListClaim_scope="|scopeValue1|scopeValue2"
+ *      unsecuredLoginLifetimeSeconds="60";
+ * };
+ * </pre>
+ * 
+ * This class is the default when the SASL mechanism is OAUTHBEARER and no value
+ * is explicitly set via either the {@code sasl.login.callback.handler.class}
+ * client configuration property or the
+ * {@code listener.name.sasl_[plaintext|ssl].oauthbearer.sasl.login.callback.handler.class}
+ * broker configuration property.
+ */
+public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCallbackHandler {
+    private final Logger log = LoggerFactory.getLogger(OAuthBearerUnsecuredLoginCallbackHandler.class);
+    private static final String OPTION_PREFIX = "unsecuredLogin";
+    private static final String PRINCIPAL_CLAIM_NAME_OPTION = OPTION_PREFIX + "PrincipalClaimName";
+    private static final String LIFETIME_SECONDS_OPTION = OPTION_PREFIX + "LifetimeSeconds";
+    private static final String SCOPE_CLAIM_NAME_OPTION = OPTION_PREFIX + "ScopeClaimName";
+    private static final Set<String> RESERVED_CLAIMS = Collections
+            .unmodifiableSet(new HashSet<>(Arrays.asList("iat", "exp")));
+    private static final String DEFAULT_PRINCIPAL_CLAIM_NAME = "sub";
+    private static final String DEFAULT_LIFETIME_SECONDS_ONE_HOUR = "3600";
+    private static final String DEFAULT_SCOPE_CLAIM_NAME = "scope";
+    private static final String STRING_CLAIM_PREFIX = OPTION_PREFIX + "StringClaim_";
+    private static final String NUMBER_CLAIM_PREFIX = OPTION_PREFIX + "NumberClaim_";
+    private static final String LIST_CLAIM_PREFIX = OPTION_PREFIX + "ListClaim_";
+    private static final String QUOTE = "\"";
+    private Time time = Time.SYSTEM;
+    private Map<String, String> moduleOptions = null;
+    private boolean configured = false;
+
+    /**
+     * For testing
+     * 
+     * @param time
+     *            the mandatory time to set
+     */
+    void time(Time time) {
+        this.time = Objects.requireNonNull(time);
+    }
+
+    /**
+     * Return true if this instance has been configured, otherwise false
+     * 
+     * @return true if this instance has been configured, otherwise false
+     */
+    public boolean configured() {
+        return configured;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(
+                    String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+                            jaasConfigEntries.size()));
+        this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
+        configured = true;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        if (!configured())
+            throw new IllegalStateException("Callback handler not configured");
+        for (Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerTokenCallback)
+                try {
+                    handleCallback((OAuthBearerTokenCallback) callback);
+                } catch (KafkaException e) {
+                    throw new IOException(e.getMessage(), e);
+                }
+            else
+                throw new UnsupportedCallbackException(callback);
+        }
+    }
+
+    @Override
+    public void close() {
+        // empty
+    }
+
+    private void handleCallback(OAuthBearerTokenCallback callback) throws IOException {
+        if (callback.token() != null)
+            throw new IllegalArgumentException("Callback had a token already");
+        String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION);
+        String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty()
+                ? principalClaimNameValue.trim()
+                : DEFAULT_PRINCIPAL_CLAIM_NAME;
+        String scopeClaimNameValue = optionValue(SCOPE_CLAIM_NAME_OPTION);
+        String scopeClaimName = scopeClaimNameValue != null && !scopeClaimNameValue.trim().isEmpty()
+                ? scopeClaimNameValue.trim()
+                : DEFAULT_SCOPE_CLAIM_NAME;
+        String headerJson = "{" + claimOrHeaderJsonText("alg", "none") + "}";
+        String lifetimeSecondsValueToUse = optionValue(LIFETIME_SECONDS_OPTION, DEFAULT_LIFETIME_SECONDS_ONE_HOUR);
+        String claimsJson;
+        try {
+            claimsJson = String.format("{%s,%s%s}", expClaimText(Long.parseLong(lifetimeSecondsValueToUse)),
+                    claimOrHeaderJsonText("iat", time.milliseconds() / 1000.0),
+                    commaPrependedStringNumberAndListClaimsJsonText());
+        } catch (NumberFormatException e) {
+            throw new OAuthBearerConfigException(e.getMessage());
+        }
+        try {
+            Encoder urlEncoderNoPadding = Base64.getUrlEncoder().withoutPadding();
+            OAuthBearerUnsecuredJws jws = new OAuthBearerUnsecuredJws(
+                    String.format("%s.%s.",
+                            urlEncoderNoPadding.encodeToString(headerJson.getBytes(StandardCharsets.UTF_8)),
+                            urlEncoderNoPadding.encodeToString(claimsJson.getBytes(StandardCharsets.UTF_8))),
+                    principalClaimName, scopeClaimName);
+            log.info("Retrieved token with principal {}", jws.principalName());
+            callback.token(jws);
+        } catch (OAuthBearerIllegalTokenException e) {
+            // occurs if the principal claim doesn't exist or has an empty value
+            throw new OAuthBearerConfigException(e.getMessage(), e);
+        }
+    }
+
+    private String commaPrependedStringNumberAndListClaimsJsonText() throws OAuthBearerConfigException {
+        StringBuilder sb = new StringBuilder();
+        for (String key : moduleOptions.keySet()) {
+            if (key.startsWith(STRING_CLAIM_PREFIX) && key.length() > STRING_CLAIM_PREFIX.length())
+                sb.append(',').append(claimOrHeaderJsonText(
+                        confirmNotReservedClaimName(key.substring(STRING_CLAIM_PREFIX.length())), optionValue(key)));
+            else if (key.startsWith(NUMBER_CLAIM_PREFIX) && key.length() > NUMBER_CLAIM_PREFIX.length())
+                sb.append(',')
+                        .append(claimOrHeaderJsonText(
+                                confirmNotReservedClaimName(key.substring(NUMBER_CLAIM_PREFIX.length())),
+                                Double.valueOf(optionValue(key))));
+            else if (key.startsWith(LIST_CLAIM_PREFIX) && key.length() > LIST_CLAIM_PREFIX.length())
+                sb.append(',')
+                        .append(claimOrHeaderJsonArrayText(
+                                confirmNotReservedClaimName(key.substring(LIST_CLAIM_PREFIX.length())),
+                                listJsonText(optionValue(key))));
+        }
+        return sb.toString();
+    }
+
+    private String confirmNotReservedClaimName(String claimName) throws OAuthBearerConfigException {
+        if (RESERVED_CLAIMS.contains(claimName))
+            throw new OAuthBearerConfigException(String.format("Cannot explicitly set the '%s' claim", claimName));
+        return claimName;
+    }
+
+    private String listJsonText(String value) {
+        if (value.isEmpty() || value.length() <= 1)
+            return "[]";
+        String delimiter;
+        String unescapedDelimiterChar = value.substring(0, 1);
+        switch (unescapedDelimiterChar) {
+            case "\\":
+            case ".":
+            case "[":
+            case "(":
+            case "{":
+            case "|":
+            case "^":
+            case "$":
+                delimiter = "\\" + unescapedDelimiterChar;
+                break;
+            default:
+                delimiter = unescapedDelimiterChar;
+                break;
+        }
+        String listText = value.substring(1);
+        String[] elements = listText.split(delimiter);
+        StringBuilder sb = new StringBuilder();
+        for (String element : elements) {
+            sb.append(sb.length() == 0 ? '[' : ',');
+            sb.append('"').append(escape(element)).append('"');
+        }
+        if (listText.startsWith(unescapedDelimiterChar) || listText.endsWith(unescapedDelimiterChar)
+                || listText.contains(unescapedDelimiterChar + unescapedDelimiterChar))
+            sb.append(",\"\"");
+        return sb.append(']').toString();
+    }
+
+    private String optionValue(String key) {
+        return optionValue(key, null);
+    }
+
+    private String optionValue(String key, String defaultValue) {
+        String explicitValue = option(key);
+        return explicitValue != null ? explicitValue : defaultValue;
+    }
+
+    private String option(String key) {
+        if (!configured)
+            throw new IllegalStateException("Callback handler not configured");
+        return moduleOptions.get(Objects.requireNonNull(key));
+    }
+
+    private String claimOrHeaderJsonText(String claimName, Number claimValue) {
+        return QUOTE + escape(claimName) + QUOTE + ":" + claimValue;
+    }
+
+    private String claimOrHeaderJsonText(String claimName, String claimValue) {
+        return QUOTE + escape(claimName) + QUOTE + ":" + QUOTE + escape(claimValue) + QUOTE;
+    }
+
+    private String claimOrHeaderJsonArrayText(String claimName, String escapedClaimValue) {
+        if (!escapedClaimValue.startsWith("[") || !escapedClaimValue.endsWith("]"))
+            throw new IllegalArgumentException(String.format("Illegal JSON array: %s", escapedClaimValue));
+        return QUOTE + escape(claimName) + QUOTE + ":" + escapedClaimValue;
+    }
+
+    private String escape(String jsonStringValue) {
+        return jsonStringValue.replace("\"", "\\\"").replace("\\", "\\\\");
+    }
+
+    private String expClaimText(long lifetimeSeconds) {
+        return claimOrHeaderJsonText("exp", time.milliseconds() / 1000.0 + lifetimeSeconds);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
new file mode 100644
index 0000000..0e61727
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code CallbackHandler} that recognizes
+ * {@link OAuthBearerValidatorCallback} and validates an unsecured OAuth 2
+ * bearer token. It requires there to be an <code>"exp" (Expiration Time)</code>
+ * claim of type Number. If <code>"iat" (Issued At)</code> or
+ * <code>"nbf" (Not Before)</code> claims are present each must be a number that
+ * precedes the Expiration Time claim, and if both are present the Not Before
+ * claim must not precede the Issued At claim. It also accepts the following
+ * options, none of which are required:
+ * <ul>
+ * <li>{@code unsecuredValidatorPrincipalClaimName} set to a non-empty value if
+ * you wish a particular String claim holding a principal name to be checked for
+ * existence; the default is to check for the existence of the '{@code sub}'
+ * claim</li>
+ * <li>{@code unsecuredValidatorScopeClaimName} set to a custom claim name if
+ * you wish the name of the String or String List claim holding any token scope
+ * to be something other than '{@code scope}'</li>
+ * <li>{@code unsecuredValidatorRequiredScope} set to a space-delimited list of
+ * scope values if you wish the String/String List claim holding the token scope
+ * to be checked to make sure it contains certain values</li>
+ * <li>{@code unsecuredValidatorAllowableClockSkewMs} set to a positive integer
+ * value if you wish to allow up to some number of positive milliseconds of
+ * clock skew (the default is 0)</li>
+ * <ul>
+ * For example:
+ * 
+ * <pre>
+ * KafkaServer {
+ *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
+ *      unsecuredLoginStringClaim_sub="thePrincipalName"
+ *      unsecuredLoginListClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
+ *      unsecuredValidatorRequiredScope="LOGIN_TO_KAFKA"
+ *      unsecuredValidatorAllowableClockSkewMs="3000";
+ * };
+ * </pre>
+ * 
+ * This class is the default when the SASL mechanism is OAUTHBEARER and no value
+ * is explicitly set via the
+ * {@code listener.name.sasl_[plaintext|ssl].oauthbearer.sasl.server.callback.handler.class}
+ * broker configuration property.
+ */
+public class OAuthBearerUnsecuredValidatorCallbackHandler implements AuthenticateCallbackHandler {
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerUnsecuredValidatorCallbackHandler.class);
+    private static final String OPTION_PREFIX = "unsecuredValidator";
+    private static final String PRINCIPAL_CLAIM_NAME_OPTION = OPTION_PREFIX + "PrincipalClaimName";
+    private static final String SCOPE_CLAIM_NAME_OPTION = OPTION_PREFIX + "ScopeClaimName";
+    private static final String REQUIRED_SCOPE_OPTION = OPTION_PREFIX + "RequiredScope";
+    private static final String ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION = OPTION_PREFIX + "AllowableClockSkewMs";
+    private Time time = Time.SYSTEM;
+    private Map<String, String> moduleOptions = null;
+    private boolean configured = false;
+
+    /**
+     * For testing
+     * 
+     * @param time
+     *            the mandatory time to set
+     */
+    void time(Time time) {
+        this.time = Objects.requireNonNull(time);
+    }
+
+    /**
+     * Return true if this instance has been configured, otherwise false
+     * 
+     * @return true if this instance has been configured, otherwise false
+     */
+    public boolean configured() {
+        return configured;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(
+                    String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+                            jaasConfigEntries.size()));
+        final Map<String, String> unmodifiableModuleOptions = Collections
+                .unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
+        this.moduleOptions = unmodifiableModuleOptions;
+        configured = true;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        if (!configured())
+            throw new IllegalStateException("Callback handler not configured");
+        for (Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerValidatorCallback) {
+                OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
+                try {
+                    handleCallback(validationCallback);
+                } catch (OAuthBearerIllegalTokenException e) {
+                    OAuthBearerValidationResult failureReason = e.reason();
+                    String failureScope = failureReason.failureScope();
+                    validationCallback.error(failureScope != null ? "insufficient_scope" : "invalid_token",
+                            failureScope, failureReason.failureOpenIdConfig());
+                }
+            } else
+                throw new UnsupportedCallbackException(callback);
+        }
+    }
+
+    @Override
+    public void close() {
+        // empty
+    }
+
+    private void handleCallback(OAuthBearerValidatorCallback callback) {
+        String tokenValue = callback.tokenValue();
+        if (tokenValue == null)
+            throw new IllegalArgumentException("Callback missing required token value");
+        String principalClaimName = principalClaimName();
+        String scopeClaimName = scopeClaimName();
+        List<String> requiredScope = requiredScope();
+        int allowableClockSkewMs = allowableClockSkewMs();
+        OAuthBearerUnsecuredJws unsecuredJwt = new OAuthBearerUnsecuredJws(tokenValue, principalClaimName,
+                scopeClaimName);
+        long now = time.milliseconds();
+        OAuthBearerValidationUtils
+                .validateClaimForExistenceAndType(unsecuredJwt, true, principalClaimName, String.class)
+                .throwExceptionIfFailed();
+        OAuthBearerValidationUtils.validateIssuedAt(unsecuredJwt, false, now, allowableClockSkewMs)
+                .throwExceptionIfFailed();
+        OAuthBearerValidationUtils.validateExpirationTime(unsecuredJwt, now, allowableClockSkewMs)
+                .throwExceptionIfFailed();
+        OAuthBearerValidationUtils.validateTimeConsistency(unsecuredJwt).throwExceptionIfFailed();
+        OAuthBearerValidationUtils.validateScope(unsecuredJwt, requiredScope).throwExceptionIfFailed();
+        log.info("Successfully validated token with principal {}: {}", unsecuredJwt.principalName(),
+                unsecuredJwt.claims().toString());
+        callback.token(unsecuredJwt);
+    }
+
+    private String principalClaimName() {
+        String principalClaimNameValue = option(PRINCIPAL_CLAIM_NAME_OPTION);
+        String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty()
+                ? principalClaimNameValue.trim()
+                : "sub";
+        return principalClaimName;
+    }
+
+    private String scopeClaimName() {
+        String scopeClaimNameValue = option(SCOPE_CLAIM_NAME_OPTION);
+        String scopeClaimName = scopeClaimNameValue != null && !scopeClaimNameValue.trim().isEmpty()
+                ? scopeClaimNameValue.trim()
+                : "scope";
+        return scopeClaimName;
+    }
+
+    private List<String> requiredScope() {
+        String requiredSpaceDelimitedScope = option(REQUIRED_SCOPE_OPTION);
+        List<String> requiredScope = requiredSpaceDelimitedScope == null || requiredSpaceDelimitedScope.trim().isEmpty()
+                ? Collections.<String>emptyList()
+                : OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
+        return requiredScope;
+    }
+
+    private int allowableClockSkewMs() {
+        String allowableClockSkewMsValue = option(ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION);
+        int allowableClockSkewMs = 0;
+        try {
+            allowableClockSkewMs = allowableClockSkewMsValue == null || allowableClockSkewMsValue.trim().isEmpty() ? 0
+                    : Integer.parseInt(allowableClockSkewMsValue.trim());
+        } catch (NumberFormatException e) {
+            throw new OAuthBearerConfigException(e.getMessage(), e);
+        }
+        if (allowableClockSkewMs < 0) {
+            throw new OAuthBearerConfigException(
+                    String.format("Allowable clock skew millis must not be negative: %s", allowableClockSkewMsValue));
+        }
+        return allowableClockSkewMs;
+    }
+
+    private String option(String key) {
+        if (!configured)
+            throw new IllegalStateException("Callback handler not configured");
+        return moduleOptions.get(Objects.requireNonNull(key));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationResult.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationResult.java
new file mode 100644
index 0000000..36551e9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationResult.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import java.io.Serializable;
+
+/**
+ * The result of some kind of token validation
+ */
+public class OAuthBearerValidationResult implements Serializable {
+    private static final long serialVersionUID = 5774669940899777373L;
+    private final boolean success;
+    private final String failureDescription;
+    private final String failureScope;
+    private final String failureOpenIdConfig;
+
+    /**
+     * Return an instance indicating success
+     * 
+     * @return an instance indicating success
+     */
+    public static OAuthBearerValidationResult newSuccess() {
+        return new OAuthBearerValidationResult(true, null, null, null);
+    }
+
+    /**
+     * Return a new validation failure instance
+     * 
+     * @param failureDescription
+     *            optional description of the failure
+     * @return a new validation failure instance
+     */
+    public static OAuthBearerValidationResult newFailure(String failureDescription) {
+        return newFailure(failureDescription, null, null);
+    }
+
+    /**
+     * Return a new validation failure instance
+     * 
+     * @param failureDescription
+     *            optional description of the failure
+     * @param failureScope
+     *            optional scope to be reported with the failure
+     * @param failureOpenIdConfig
+     *            optional OpenID Connect configuration to be reported with the
+     *            failure
+     * @return a new validation failure instance
+     */
+    public static OAuthBearerValidationResult newFailure(String failureDescription, String failureScope,
+            String failureOpenIdConfig) {
+        return new OAuthBearerValidationResult(false, failureDescription, failureScope, failureOpenIdConfig);
+    }
+
+    private OAuthBearerValidationResult(boolean success, String failureDescription, String failureScope,
+            String failureOpenIdConfig) {
+        if (success && (failureScope != null || failureOpenIdConfig != null))
+            throw new IllegalArgumentException("success was indicated but failure scope/OpenIdConfig were provided");
+        this.success = success;
+        this.failureDescription = failureDescription;
+        this.failureScope = failureScope;
+        this.failureOpenIdConfig = failureOpenIdConfig;
+    }
+
+    /**
+     * Return true if this instance indicates success, otherwise false
+     * 
+     * @return true if this instance indicates success, otherwise false
+     */
+    public boolean success() {
+        return success;
+    }
+
+    /**
+     * Return the (potentially null) descriptive message for the failure
+     * 
+     * @return the (potentially null) descriptive message for the failure
+     */
+    public String failureDescription() {
+        return failureDescription;
+    }
+
+    /**
+     * Return the (potentially null) scope to be reported with the failure
+     * 
+     * @return the (potentially null) scope to be reported with the failure
+     */
+    public String failureScope() {
+        return failureScope;
+    }
+
+    /**
+     * Return the (potentially null) OpenID Connect configuration to be reported
+     * with the failure
+     * 
+     * @return the (potentially null) OpenID Connect configuration to be reported
+     *         with the failure
+     */
+    public String failureOpenIdConfig() {
+        return failureOpenIdConfig;
+    }
+
+    /**
+     * Raise an exception if this instance indicates failure, otherwise do nothing
+     * 
+     * @throws OAuthBearerIllegalTokenException
+     *             if this instance indicates failure
+     */
+    public void throwExceptionIfFailed() throws OAuthBearerIllegalTokenException {
+        if (!success())
+            throw new OAuthBearerIllegalTokenException(this);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationUtils.java
new file mode 100644
index 0000000..6f1b2fe
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationUtils.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+
+public class OAuthBearerValidationUtils {
+    /**
+     * Validate the given claim for existence and type. It can be required to exist
+     * in the given claims, and if it exists it must be one of the types indicated
+     * 
+     * @param jwt
+     *            the mandatory JWT to which the validation will be applied
+     * @param required
+     *            true if the claim is required to exist
+     * @param claimName
+     *            the required claim name identifying the claim to be checked
+     * @param allowedTypes
+     *            one or more of {@code String.class}, {@code Number.class}, and
+     *            {@code List.class} identifying the type(s) that the claim value is
+     *            allowed to be if it exists
+     * @return the result of the validation
+     */
+    public static OAuthBearerValidationResult validateClaimForExistenceAndType(OAuthBearerUnsecuredJws jwt,
+            boolean required, String claimName, Class<?>... allowedTypes) {
+        Object rawClaim = Objects.requireNonNull(jwt).rawClaim(Objects.requireNonNull(claimName));
+        if (rawClaim == null)
+            return required
+                    ? OAuthBearerValidationResult.newFailure(String.format("Required claim missing: %s", claimName))
+                    : OAuthBearerValidationResult.newSuccess();
+        for (Class<?> allowedType : allowedTypes) {
+            if (allowedType != null && allowedType.isAssignableFrom(rawClaim.getClass()))
+                return OAuthBearerValidationResult.newSuccess();
+        }
+        return OAuthBearerValidationResult.newFailure(String.format("The %s claim had the incorrect type: %s",
+                claimName, rawClaim.getClass().getSimpleName()));
+    }
+
+    /**
+     * Validate the 'iat' (Issued At) claim. It can be required to exist in the
+     * given claims, and if it exists it must be a (potentially fractional) number
+     * of seconds since the epoch defining when the JWT was issued; it is a
+     * validation error if the Issued At time is after the time at which the check
+     * is being done (plus any allowable clock skew).
+     * 
+     * @param jwt
+     *            the mandatory JWT to which the validation will be applied
+     * @param required
+     *            true if the claim is required to exist
+     * @param whenCheckTimeMs
+     *            the time relative to which the validation is to occur
+     * @param allowableClockSkewMs
+     *            non-negative number to take into account some potential clock skew
+     * @return the result of the validation
+     * @throws OAuthBearerConfigException
+     *             if the given allowable clock skew is negative
+     */
+    public static OAuthBearerValidationResult validateIssuedAt(OAuthBearerUnsecuredJws jwt, boolean required,
+            long whenCheckTimeMs, int allowableClockSkewMs) throws OAuthBearerConfigException {
+        Number value;
+        try {
+            value = Objects.requireNonNull(jwt).issuedAt();
+        } catch (OAuthBearerIllegalTokenException e) {
+            return e.reason();
+        }
+        boolean exists = value != null;
+        if (!exists)
+            return doesNotExistResult(required, "iat");
+        double doubleValue = value.doubleValue();
+        return 1000 * doubleValue > whenCheckTimeMs + confirmNonNegative(allowableClockSkewMs)
+                ? OAuthBearerValidationResult.newFailure(String.format(
+                        "The Issued At value (%f seconds) was after the indicated time (%d ms) plus allowable clock skew (%d ms)",
+                        doubleValue, whenCheckTimeMs, allowableClockSkewMs))
+                : OAuthBearerValidationResult.newSuccess();
+    }
+
+    /**
+     * Validate the 'exp' (Expiration Time) claim. It must exist and it must be a
+     * (potentially fractional) number of seconds defining the point at which the
+     * JWT expires. It is a validation error if the time at which the check is being
+     * done (minus any allowable clock skew) is on or after the Expiration Time
+     * time.
+     * 
+     * @param jwt
+     *            the mandatory JWT to which the validation will be applied
+     * @param whenCheckTimeMs
+     *            the time relative to which the validation is to occur
+     * @param allowableClockSkewMs
+     *            non-negative number to take into account some potential clock skew
+     * @return the result of the validation
+     * @throws OAuthBearerConfigException
+     *             if the given allowable clock skew is negative
+     */
+    public static OAuthBearerValidationResult validateExpirationTime(OAuthBearerUnsecuredJws jwt, long whenCheckTimeMs,
+            int allowableClockSkewMs) throws OAuthBearerConfigException {
+        Number value;
+        try {
+            value = Objects.requireNonNull(jwt).expirationTime();
+        } catch (OAuthBearerIllegalTokenException e) {
+            return e.reason();
+        }
+        boolean exists = value != null;
+        if (!exists)
+            return doesNotExistResult(true, "exp");
+        double doubleValue = value.doubleValue();
+        return whenCheckTimeMs - confirmNonNegative(allowableClockSkewMs) >= 1000 * doubleValue
+                ? OAuthBearerValidationResult.newFailure(String.format(
+                        "The indicated time (%d ms) minus allowable clock skew (%d ms) was on or after the Expiration Time value (%f seconds)",
+                        whenCheckTimeMs, allowableClockSkewMs, doubleValue))
+                : OAuthBearerValidationResult.newSuccess();
+    }
+
+    /**
+     * Validate the 'iat' (Issued At) and 'exp' (Expiration Time) claims for
+     * internal consistency. The following must be true if both claims exist:
+     * 
+     * <pre>
+     * exp > iat
+     * </pre>
+     * 
+     * @param jwt
+     *            the mandatory JWT to which the validation will be applied
+     * @return the result of the validation
+     */
+    public static OAuthBearerValidationResult validateTimeConsistency(OAuthBearerUnsecuredJws jwt) {
+        Number issuedAt;
+        Number expirationTime;
+        try {
+            issuedAt = Objects.requireNonNull(jwt).issuedAt();
+            expirationTime = jwt.expirationTime();
+        } catch (OAuthBearerIllegalTokenException e) {
+            return e.reason();
+        }
+        if (expirationTime != null && issuedAt != null && expirationTime.doubleValue() <= issuedAt.doubleValue())
+            return OAuthBearerValidationResult.newFailure(
+                    String.format("The Expiration Time time (%f seconds) was not after the Issued At time (%f seconds)",
+                            expirationTime.doubleValue(), issuedAt.doubleValue()));
+        return OAuthBearerValidationResult.newSuccess();
+    }
+
+    /**
+     * Validate the given token's scope against the required scope. Every required
+     * scope element (if any) must exist in the provided token's scope for the
+     * validation to succeed.
+     * 
+     * @param token
+     *            the required token for which the scope will to validate
+     * @param requiredScope
+     *            the optional required scope against which the given token's scope
+     *            will be validated
+     * @return the result of the validation
+     */
+    public static OAuthBearerValidationResult validateScope(OAuthBearerToken token, List<String> requiredScope) {
+        final Set<String> tokenScope = token.scope();
+        if (requiredScope == null || requiredScope.isEmpty())
+            return OAuthBearerValidationResult.newSuccess();
+        for (String requiredScopeElement : requiredScope) {
+            if (!tokenScope.contains(requiredScopeElement))
+                return OAuthBearerValidationResult.newFailure(String.format(
+                        "The provided scope (%s) was mising a required scope (%s).  All required scope elements: %s",
+                        String.valueOf(tokenScope), requiredScopeElement, requiredScope.toString()),
+                        requiredScope.toString(), null);
+        }
+        return OAuthBearerValidationResult.newSuccess();
+    }
+
+    private static int confirmNonNegative(int allowableClockSkewMs) throws OAuthBearerConfigException {
+        if (allowableClockSkewMs < 0)
+            throw new OAuthBearerConfigException(
+                    String.format("Allowable clock skew must not be negative: %d", allowableClockSkewMs));
+        return allowableClockSkewMs;
+    }
+
+    private static OAuthBearerValidationResult doesNotExistResult(boolean required, String claimName) {
+        return required ? OAuthBearerValidationResult.newFailure(String.format("Required claim missing: %s", claimName))
+                : OAuthBearerValidationResult.newSuccess();
+    }
+
+    private OAuthBearerValidationUtils() {
+        // empty
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/SaslConfigsTest.java b/clients/src/test/java/org/apache/kafka/common/config/SaslConfigsTest.java
new file mode 100644
index 0000000..760fcd2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/SaslConfigsTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class SaslConfigsTest {
+    @Test
+    public void testSaslLoginRefreshDefaults() {
+        Map<String, Object> vals = new ConfigDef().withClientSaslSupport().parse(Collections.emptyMap());
+        assertEquals(SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR,
+                vals.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR));
+        assertEquals(SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER,
+                vals.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER));
+        assertEquals(SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
+                vals.get(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS));
+        assertEquals(SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS,
+                vals.get(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS));
+    }
+
+    @Test
+    public void testSaslLoginRefreshMinValuesAreValid() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, "0.5");
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, "0.0");
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, "0");
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0");
+        Map<String, Object> vals = new ConfigDef().withClientSaslSupport().parse(props);
+        assertEquals(Double.valueOf("0.5"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR));
+        assertEquals(Double.valueOf("0.0"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER));
+        assertEquals(Short.valueOf("0"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS));
+        assertEquals(Short.valueOf("0"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS));
+    }
+
+    @Test
+    public void testSaslLoginRefreshMaxValuesAreValid() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, "1.0");
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, "0.25");
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, "900");
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "3600");
+        Map<String, Object> vals = new ConfigDef().withClientSaslSupport().parse(props);
+        assertEquals(Double.valueOf("1.0"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR));
+        assertEquals(Double.valueOf("0.25"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER));
+        assertEquals(Short.valueOf("900"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS));
+        assertEquals(Short.valueOf("3600"), vals.get(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS));
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshWindowFactorMinValueIsReallyMinimum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, "0.499999");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshWindowFactorMaxValueIsReallyMaximum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, "1.0001");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshWindowJitterMinValueIsReallyMinimum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, "-0.000001");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshWindowJitterMaxValueIsReallyMaximum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, "0.251");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshMinPeriodSecondsMinValueIsReallyMinimum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, "-1");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshMinPeriodSecondsMaxValueIsReallyMaximum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, "901");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshBufferSecondsMinValueIsReallyMinimum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "-1");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testSaslLoginRefreshBufferSecondsMaxValueIsReallyMaximum() {
+        Map<Object, Object> props = new HashMap<>();
+        props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "3601");
+        new ConfigDef().withClientSaslSupport().parse(props);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index bfd1d97..d7860ff 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -79,6 +79,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.internal.ScramCredentialUtils;
@@ -1192,6 +1193,37 @@ public class SaslAuthenticatorTest {
         verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
     }
 
+    /**
+     * Tests OAUTHBEARER client and server channels.
+     */
+    @Test
+    public void testValidSaslOauthBearerMechanism() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("OAUTHBEARER", Arrays.asList("OAUTHBEARER"));
+        server = createEchoServer(securityProtocol);
+        createAndCheckClientConnection(securityProtocol, node);
+    }
+
+    /**
+     * Tests OAUTHBEARER fails the connection when the client presents a token with
+     * insufficient scope .
+     */
+    @Test
+    public void testInsufficientScopeSaslOauthBearerMechanism() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("OAUTHBEARER", Arrays.asList("OAUTHBEARER"));
+        // now update the server side to require a scope the client does not provide
+        Map<String, Object> serverJaasConfigOptionsMap = TestJaasConfig.defaultServerOptions("OAUTHBEARER");
+        serverJaasConfigOptionsMap.put("unsecuredValidatorRequiredScope", "LOGIN_TO_KAFKA"); // causes the failure
+        jaasConfig.createOrUpdateEntry("KafkaServer",
+                "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", serverJaasConfigOptionsMap);
+        server = createEchoServer(securityProtocol);
+        createAndCheckClientAuthenticationFailure(securityProtocol,
+                "node-" + OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                "{\"status\":\"insufficient_scope\", \"scope\":\"[LOGIN_TO_KAFKA]\"}");
+    }
+
     private void verifySaslAuthenticateHeaderInterop(boolean enableHeaderOnServer, boolean enableHeaderOnClient,
             SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
         configureMechanisms(saslMechanism, Arrays.asList(saslMechanism));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index 3ee7c2c..c579bc1 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -26,6 +26,7 @@ import javax.security.auth.login.Configuration;
 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
 
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.internal.ScramMechanism;
@@ -42,7 +43,7 @@ public class TestJaasConfig extends Configuration {
 
     public static TestJaasConfig createConfiguration(String clientMechanism, List<String> serverMechanisms) {
         TestJaasConfig config = new TestJaasConfig();
-        config.createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions());
+        config.createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions(clientMechanism));
         for (String mechanism : serverMechanisms) {
             config.addEntry(LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism));
         }
@@ -105,6 +106,9 @@ public class TestJaasConfig extends Configuration {
             case "DIGEST-MD5":
                 loginModule = TestDigestLoginModule.class.getName();
                 break;
+            case "OAUTHBEARER":
+                loginModule = OAuthBearerLoginModule.class.getName();
+                break;
             default:
                 if (ScramMechanism.isScram(mechanism))
                     loginModule = ScramLoginModule.class.getName();
@@ -114,6 +118,17 @@ public class TestJaasConfig extends Configuration {
         return loginModule;
     }
 
+    public static Map<String, Object> defaultClientOptions(String mechanism) {
+        switch (mechanism) {
+            case "OAUTHBEARER":
+                Map<String, Object> options = new HashMap<>();
+                options.put("unsecuredLoginStringClaim_sub", USERNAME);
+                return options;
+            default:
+                return defaultClientOptions();
+        }
+    }
+    
     public static Map<String, Object> defaultClientOptions() {
         Map<String, Object> options = new HashMap<>();
         options.put("username", USERNAME);
@@ -128,6 +143,9 @@ public class TestJaasConfig extends Configuration {
             case "DIGEST-MD5":
                 options.put("user_" + USERNAME, PASSWORD);
                 break;
+            case "OAUTHBEARER":
+                options.put("unsecuredLoginStringClaim_sub", USERNAME);
+                break;
             default:
                 if (!ScramMechanism.isScram(mechanism))
                     throw new IllegalArgumentException("Unsupported mechanism " + mechanism);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
new file mode 100644
index 0000000..d883e5e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class OAuthBearerLoginModuleTest {
+    private static class TestTokenCallbackHandler implements AuthenticateCallbackHandler {
+        private final OAuthBearerToken[] tokens;
+        private int index = 0;
+
+        public TestTokenCallbackHandler(OAuthBearerToken[] tokens) {
+            this.tokens = Objects.requireNonNull(tokens);
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof OAuthBearerTokenCallback)
+                    try {
+                        handleCallback((OAuthBearerTokenCallback) callback);
+                    } catch (KafkaException e) {
+                        throw new IOException(e.getMessage(), e);
+                    }
+                else
+                    throw new UnsupportedCallbackException(callback);
+            }
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, String saslMechanism,
+                List<AppConfigurationEntry> jaasConfigEntries) {
+            // empty
+        }
+
+        @Override
+        public void close() {
+            // empty
+        }
+
+        private void handleCallback(OAuthBearerTokenCallback callback) throws IOException {
+            if (callback.token() != null)
+                throw new IllegalArgumentException("Callback had a token already");
+            if (tokens.length > index)
+                callback.token(tokens[index++]);
+            else
+                throw new IOException("no more tokens");
+        }
+    }
+
+    @Test
+    public void login1Commit1Login2Commit2Logout1Login3Commit3Logout2() throws LoginException {
+        /*
+         * Invoke login()/commit() on loginModule1; invoke login/commit() on
+         * loginModule2; invoke logout() on loginModule1; invoke login()/commit() on
+         * loginModule3; invoke logout() on loginModule2
+         */
+        Subject subject = new Subject();
+        Set<Object> privateCredentials = subject.getPrivateCredentials();
+
+        // Create callback handler
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
+            EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)};
+        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
+        TestTokenCallbackHandler testTokenCallbackHandler = new TestTokenCallbackHandler(tokens);
+
+        // Create login modules
+        OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
+        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+        OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
+        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+        OAuthBearerLoginModule loginModule3 = new OAuthBearerLoginModule();
+        loginModule3.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+
+        // Should start with nothing
+        assertEquals(0, privateCredentials.size());
+        loginModule1.login();
+        // Should still have nothing until commit() is called
+        assertEquals(0, privateCredentials.size());
+        loginModule1.commit();
+        // Now we should have the first token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[0], privateCredentials.iterator().next());
+
+        // Now login on loginModule2 to get the second token
+        loginModule2.login();
+        // Should still have just the first token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[0], privateCredentials.iterator().next());
+        loginModule2.commit();
+        // Should have the first and second tokens at this point
+        assertEquals(2, privateCredentials.size());
+        Iterator<Object> iterator = privateCredentials.iterator();
+        assertNotSame(tokens[2], iterator.next());
+        assertNotSame(tokens[2], iterator.next());
+        // finally logout() on loginModule1
+        loginModule1.logout();
+        // Now we should have just the second token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[1], privateCredentials.iterator().next());
+
+        // Now login on loginModule3 to get the third token
+        loginModule3.login();
+        // Should still have just the second token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[1], privateCredentials.iterator().next());
+        loginModule3.commit();
+        // Should have the second and third tokens at this point
+        assertEquals(2, privateCredentials.size());
+        iterator = privateCredentials.iterator();
+        assertNotSame(tokens[0], iterator.next());
+        assertNotSame(tokens[0], iterator.next());
+        // finally logout() on loginModule2
+        loginModule2.logout();
+        // Now we should have just the third token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[2], privateCredentials.iterator().next());
+    }
+
+    @Test
+    public void login1Commit1Logout1Login2Commit2Logout2() throws LoginException {
+        /*
+         * Invoke login()/commit() on loginModule1; invoke logout() on loginModule1;
+         * invoke login()/commit() on loginModule2; invoke logout() on loginModule2
+         */
+        Subject subject = new Subject();
+        Set<Object> privateCredentials = subject.getPrivateCredentials();
+
+        // Create callback handler
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
+            EasyMock.mock(OAuthBearerToken.class)};
+        EasyMock.replay(tokens[0], tokens[1]); // expect nothing
+        TestTokenCallbackHandler testTokenCallbackHandler = new TestTokenCallbackHandler(tokens);
+
+        // Create login modules
+        OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
+        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+        OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
+        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+
+        // Should start with nothing
+        assertEquals(0, privateCredentials.size());
+        loginModule1.login();
+        // Should still have nothing until commit() is called
+        assertEquals(0, privateCredentials.size());
+        loginModule1.commit();
+        // Now we should have the first token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[0], privateCredentials.iterator().next());
+        loginModule1.logout();
+        // Should have nothing again
+        assertEquals(0, privateCredentials.size());
+
+        loginModule2.login();
+        // Should still have nothing until commit() is called
+        assertEquals(0, privateCredentials.size());
+        loginModule2.commit();
+        // Now we should have the second token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[1], privateCredentials.iterator().next());
+        loginModule2.logout();
+        // Should have nothing again
+        assertEquals(0, privateCredentials.size());
+    }
+
+    @Test
+    public void loginAbortLoginCommitLogout() throws LoginException {
+        /*
+         * Invoke login(); invoke abort(); invoke login(); logout()
+         */
+        Subject subject = new Subject();
+        Set<Object> privateCredentials = subject.getPrivateCredentials();
+
+        // Create callback handler
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
+            EasyMock.mock(OAuthBearerToken.class)};
+        EasyMock.replay(tokens[0], tokens[1]); // expect nothing
+        TestTokenCallbackHandler testTokenCallbackHandler = new TestTokenCallbackHandler(tokens);
+
+        // Create login module
+        OAuthBearerLoginModule loginModule = new OAuthBearerLoginModule();
+        loginModule.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+
+        // Should start with nothing
+        assertEquals(0, privateCredentials.size());
+        loginModule.login();
+        // Should still have nothing until commit() is called
+        assertEquals(0, privateCredentials.size());
+        loginModule.abort();
+        // Should still have nothing since we aborted
+        assertEquals(0, privateCredentials.size());
+
+        loginModule.login();
+        // Should still have nothing until commit() is called
+        assertEquals(0, privateCredentials.size());
+        loginModule.commit();
+        // Now we should have the second token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[1], privateCredentials.iterator().next());
+        loginModule.logout();
+        // Should have nothing again
+        assertEquals(0, privateCredentials.size());
+    }
+
+    @Test
+    public void login1Commit1Login2Abort2Login3Commit3Logout3() throws LoginException {
+        /*
+         * Invoke login()/commit() on loginModule1; invoke login()/abort() on
+         * loginModule2; invoke login()/commit()/logout() on loginModule3
+         */
+        Subject subject = new Subject();
+        Set<Object> privateCredentials = subject.getPrivateCredentials();
+
+        // Create callback handler
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
+            EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)};
+        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
+        TestTokenCallbackHandler testTokenCallbackHandler = new TestTokenCallbackHandler(tokens);
+
+        // Create login modules
+        OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
+        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+        OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
+        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+        OAuthBearerLoginModule loginModule3 = new OAuthBearerLoginModule();
+        loginModule3.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
+                Collections.<String, Object>emptyMap());
+
+        // Should start with nothing
+        assertEquals(0, privateCredentials.size());
+        loginModule1.login();
+        // Should still have nothing until commit() is called
+        assertEquals(0, privateCredentials.size());
+        loginModule1.commit();
+        // Now we should have the first token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[0], privateCredentials.iterator().next());
+
+        // Now go get the second token
+        loginModule2.login();
+        // Should still have first token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[0], privateCredentials.iterator().next());
+        loginModule2.abort();
+        // Should still have just the first token because we aborted
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[0], privateCredentials.iterator().next());
+
+        // Now go get the third token
+        loginModule2.login();
+        // Should still have first token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[0], privateCredentials.iterator().next());
+        loginModule2.commit();
+        // Should have first and third tokens at this point
+        assertEquals(2, privateCredentials.size());
+        Iterator<Object> iterator = privateCredentials.iterator();
+        assertNotSame(tokens[1], iterator.next());
+        assertNotSame(tokens[1], iterator.next());
+        loginModule1.logout();
+        // Now we should have just the third token
+        assertEquals(1, privateCredentials.size());
+        assertSame(tokens[2], privateCredentials.iterator().next());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallbackTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallbackTest.java
new file mode 100644
index 0000000..be97ea2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallbackTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Test;
+
+public class OAuthBearerTokenCallbackTest {
+    private static final OAuthBearerToken TOKEN = new OAuthBearerToken() {
+        @Override
+        public String value() {
+            return "value";
+        }
+
+        @Override
+        public Long startTimeMs() {
+            return null;
+        }
+
+        @Override
+        public Set<String> scope() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public String principalName() {
+            return "principalName";
+        }
+
+        @Override
+        public long lifetimeMs() {
+            return 0;
+        }
+    };
+
+    @Test
+    public void testError() {
+        String errorCode = "errorCode";
+        String errorDescription = "errorDescription";
+        String errorUri = "errorUri";
+        OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+        callback.error(errorCode, errorDescription, errorUri);
+        assertEquals(errorCode, callback.errorCode());
+        assertEquals(errorDescription, callback.errorDescription());
+        assertEquals(errorUri, callback.errorUri());
+        assertNull(callback.token());
+    }
+
+    @Test
+    public void testToken() {
+        OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+        callback.token(TOKEN);
+        assertSame(TOKEN, callback.token());
+        assertNull(callback.errorCode());
+        assertNull(callback.errorDescription());
+        assertNull(callback.errorUri());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackTest.java
new file mode 100644
index 0000000..ed82766
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Test;
+
+public class OAuthBearerValidatorCallbackTest {
+    private static final OAuthBearerToken TOKEN = new OAuthBearerToken() {
+        @Override
+        public String value() {
+            return "value";
+        }
+
+        @Override
+        public Long startTimeMs() {
+            return null;
+        }
+
+        @Override
+        public Set<String> scope() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public String principalName() {
+            return "principalName";
+        }
+
+        @Override
+        public long lifetimeMs() {
+            return 0;
+        }
+    };
+
+    @Test
+    public void testError() {
+        String errorStatus = "errorStatus";
+        String errorScope = "errorScope";
+        String errorOpenIDConfiguration = "errorOpenIDConfiguration";
+        OAuthBearerValidatorCallback callback = new OAuthBearerValidatorCallback(TOKEN.value());
+        callback.error(errorStatus, errorScope, errorOpenIDConfiguration);
+        assertEquals(errorStatus, callback.errorStatus());
+        assertEquals(errorScope, callback.errorScope());
+        assertEquals(errorOpenIDConfiguration, callback.errorOpenIDConfiguration());
+        assertNull(callback.token());
+    }
+
+    @Test
+    public void testToken() {
+        OAuthBearerValidatorCallback callback = new OAuthBearerValidatorCallback(TOKEN.value());
+        callback.token(TOKEN);
+        assertSame(TOKEN, callback.token());
+        assertNull(callback.errorStatus());
+        assertNull(callback.errorScope());
+        assertNull(callback.errorOpenIDConfiguration());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServerTest.java
new file mode 100644
index 0000000..2252ede
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/OAuthBearerSaslServerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.security.oauthbearer.internal.unsecured.OAuthBearerConfigException;
+import org.apache.kafka.common.security.oauthbearer.internal.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.internal.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OAuthBearerSaslServerTest {
+    private static final String USER = "user";
+    private static final Map<String, ?> CONFIGS;
+    static {
+        String jaasConfigText = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required"
+                + " unsecuredLoginStringClaim_sub=\"" + USER + "\";";
+        Map<String, Object> tmp = new HashMap<>();
+        tmp.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigText));
+        CONFIGS = Collections.unmodifiableMap(tmp);
+    }
+    private static final AuthenticateCallbackHandler LOGIN_CALLBACK_HANDLER;
+    static {
+        LOGIN_CALLBACK_HANDLER = new OAuthBearerUnsecuredLoginCallbackHandler();
+        LOGIN_CALLBACK_HANDLER.configure(CONFIGS, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                JaasContext.loadClientContext(CONFIGS).configurationEntries());
+    }
+    private static final AuthenticateCallbackHandler VALIDATOR_CALLBACK_HANDLER;
+    static {
+        VALIDATOR_CALLBACK_HANDLER = new OAuthBearerUnsecuredValidatorCallbackHandler();
+        VALIDATOR_CALLBACK_HANDLER.configure(CONFIGS, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                JaasContext.loadClientContext(CONFIGS).configurationEntries());
+    }
+    private OAuthBearerSaslServer saslServer;
+
+    @Before
+    public void setUp() throws Exception {
+        saslServer = new OAuthBearerSaslServer(VALIDATOR_CALLBACK_HANDLER);
+    }
+
+    @Test
+    public void noAuthorizationIdSpecified() throws Exception {
+        byte[] nextChallenge = saslServer
+                .evaluateResponse(clientInitialResponseText(null).getBytes(StandardCharsets.UTF_8));
+        assertTrue("Next challenge is not empty", nextChallenge.length == 0);
+    }
+
+    @Test
+    public void authorizatonIdEqualsAuthenticationId() throws Exception {
+        byte[] nextChallenge = saslServer
+                .evaluateResponse(clientInitialResponseText(USER).getBytes(StandardCharsets.UTF_8));
+        assertTrue("Next challenge is not empty", nextChallenge.length == 0);
+    }
+
+    @Test(expected = SaslAuthenticationException.class)
+    public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
+        saslServer.evaluateResponse(clientInitialResponseText(USER + "x").getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void illegalToken() throws Exception {
+        byte[] bytes = saslServer
+                .evaluateResponse((clientInitialResponseText(null) + "AB").getBytes(StandardCharsets.UTF_8));
+        String challenge = new String(bytes, StandardCharsets.UTF_8);
+        assertEquals("{\"status\":\"invalid_token\"}", challenge);
+    }
+
+    private String clientInitialResponseText(String authorizationId)
+            throws OAuthBearerConfigException, IOException, UnsupportedCallbackException, LoginException {
+        OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+        LOGIN_CALLBACK_HANDLER.handle(new Callback[] {callback});
+        OAuthBearerToken token = callback.token();
+        String compactSerialization = token.value();
+        String clientInitialResponseText = "n,"
+                + (authorizationId == null || authorizationId.isEmpty() ? "" : "a=" + authorizationId) + ",auth=Bearer "
+                + compactSerialization;
+        return clientInitialResponseText;
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshConfigTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshConfigTest.java
new file mode 100644
index 0000000..5128a33
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshConfigTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.expiring;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.junit.Test;
+
+public class ExpiringCredentialRefreshConfigTest {
+    @Test
+    public void fromGoodConfig() {
+        ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig = new ExpiringCredentialRefreshConfig(
+                new ConfigDef().withClientSaslSupport().parse(Collections.emptyMap()), true);
+        assertEquals(Double.valueOf(SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR),
+                Double.valueOf(expiringCredentialRefreshConfig.loginRefreshWindowFactor()));
+        assertEquals(Double.valueOf(SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER),
+                Double.valueOf(expiringCredentialRefreshConfig.loginRefreshWindowJitter()));
+        assertEquals(Short.valueOf(SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS),
+                Short.valueOf(expiringCredentialRefreshConfig.loginRefreshMinPeriodSeconds()));
+        assertEquals(Short.valueOf(SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS),
+                Short.valueOf(expiringCredentialRefreshConfig.loginRefreshBufferSeconds()));
+        assertTrue(expiringCredentialRefreshConfig.loginRefreshReloginAllowedBeforeLogout());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshingLoginTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshingLoginTest.java
new file mode 100644
index 0000000..96fcc6a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.expiring;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.security.oauthbearer.internal.expiring.ExpiringCredentialRefreshingLogin.LoginContextFactory;
+import org.apache.kafka.common.utils.MockScheduler;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class ExpiringCredentialRefreshingLoginTest {
+    private static final Configuration EMPTY_WILDCARD_CONFIGURATION;
+    static {
+        EMPTY_WILDCARD_CONFIGURATION = new Configuration() {
+            @Override
+            public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+                return new AppConfigurationEntry[0]; // match any name
+            }
+        };
+    }
+
+    /*
+     * An ExpiringCredentialRefreshingLogin that we can tell explicitly to
+     * create/remove an expiring credential with specific
+     * create/expire/absoluteLastRefresh times
+     */
+    private static class TestExpiringCredentialRefreshingLogin extends ExpiringCredentialRefreshingLogin {
+        private ExpiringCredential expiringCredential;
+        private ExpiringCredential tmpExpiringCredential;
+        private final Time time;
+        private final long lifetimeMillis;
+        private final long absoluteLastRefreshTimeMs;
+        private final boolean clientReloginAllowedBeforeLogout;
+
+        public TestExpiringCredentialRefreshingLogin(ExpiringCredentialRefreshConfig refreshConfig,
+                LoginContextFactory loginContextFactory, Time time, final long lifetimeMillis,
+                final long absoluteLastRefreshMs, boolean clientReloginAllowedBeforeLogout) {
+            super("contextName", EMPTY_WILDCARD_CONFIGURATION, refreshConfig, null,
+                    TestExpiringCredentialRefreshingLogin.class, loginContextFactory, Objects.requireNonNull(time));
+            this.time = time;
+            this.lifetimeMillis = lifetimeMillis;
+            this.absoluteLastRefreshTimeMs = absoluteLastRefreshMs;
+            this.clientReloginAllowedBeforeLogout = clientReloginAllowedBeforeLogout;
+        }
+
+        /*
+         * Invoke at login time
+         */
+        public void createNewExpiringCredential() {
+            if (!clientReloginAllowedBeforeLogout)
+                /*
+                 * Was preceded by logout
+                 */
+                expiringCredential = internalNewExpiringCredential();
+            else {
+                boolean initialLogin = expiringCredential == null;
+                if (initialLogin)
+                    // no logout immediately after the initial login
+                    this.expiringCredential = internalNewExpiringCredential();
+                else
+                    /*
+                     * This is at least the second invocation of login; we will move the credential
+                     * over upon logout, which should be invoked next
+                     */
+                    this.tmpExpiringCredential = internalNewExpiringCredential();
+            }
+        }
+
+        /*
+         * Invoke at logout time
+         */
+        public void clearExpiringCredential() {
+            if (!clientReloginAllowedBeforeLogout)
+                /*
+                 * Have not yet invoked login
+                 */
+                expiringCredential = null;
+            else
+                /*
+                 * login has already been invoked
+                 */
+                expiringCredential = tmpExpiringCredential;
+        }
+
+        @Override
+        public ExpiringCredential expiringCredential() {
+            return expiringCredential;
+        }
+
+        private ExpiringCredential internalNewExpiringCredential() {
+            return new ExpiringCredential() {
+                private final long createMs = time.milliseconds();
+                private final long expireTimeMs = createMs + lifetimeMillis;
+
+                @Override
+                public String principalName() {
+                    return "Created at " + new Date(createMs);
+                }
+
+                @Override
+                public Long startTimeMs() {
+                    return createMs;
+                }
+
+                @Override
+                public long expireTimeMs() {
+                    return expireTimeMs;
+                }
+
+                @Override
+                public Long absoluteLastRefreshTimeMs() {
+                    return absoluteLastRefreshTimeMs;
+                }
+
+                // useful in debugger
+                @Override
+                public String toString() {
+                    return String.format("startTimeMs=%d, expireTimeMs=%d, absoluteLastRefreshTimeMs=%s", startTimeMs(),
+                            expireTimeMs(), absoluteLastRefreshTimeMs());
+                }
+
+            };
+        }
+    }
+
+    /*
+     * A class that will forward all login/logout/getSubject() calls to a mock while
+     * also telling an instance of TestExpiringCredentialRefreshingLogin to
+     * create/remove an expiring credential upon login/logout(). Basically we are
+     * getting the functionality of a mock while simultaneously in the same method
+     * call performing creation/removal of expiring credentials.
+     */
+    private static class TestLoginContext extends LoginContext {
+        private final TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin;
+        private final LoginContext mockLoginContext;
+
+        public TestLoginContext(TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin,
+                LoginContext mockLoginContext) throws LoginException {
+            super("contextName", null, null, EMPTY_WILDCARD_CONFIGURATION);
+            this.testExpiringCredentialRefreshingLogin = Objects.requireNonNull(testExpiringCredentialRefreshingLogin);
+            // sanity check to make sure it is likely a mock
+            if (Objects.requireNonNull(mockLoginContext).getClass().equals(LoginContext.class)
+                    || mockLoginContext.getClass().equals(getClass()))
+                throw new IllegalArgumentException();
+            this.mockLoginContext = mockLoginContext;
+        }
+
+        @Override
+        public void login() throws LoginException {
+            /*
+             * Here is where we get the functionality of a mock while simultaneously
+             * performing the creation of an expiring credential
+             */
+            mockLoginContext.login();
+            testExpiringCredentialRefreshingLogin.createNewExpiringCredential();
+        }
+
+        @Override
+        public void logout() throws LoginException {
+            /*
+             * Here is where we get the functionality of a mock while simultaneously
+             * performing the removal of an expiring credential
+             */
+            mockLoginContext.logout();
+            testExpiringCredentialRefreshingLogin.clearExpiringCredential();
+        }
+
+        @Override
+        public Subject getSubject() {
+            // here we just need the functionality of a mock
+            return mockLoginContext.getSubject();
+        }
+    }
+
+    /*
+     * An implementation of LoginContextFactory that returns an instance of
+     * TestLoginContext
+     */
+    private static class TestLoginContextFactory extends LoginContextFactory {
+        private final KafkaFutureImpl<Object> refresherThreadStartedFuture = new KafkaFutureImpl<>();
+        private final KafkaFutureImpl<Object> refresherThreadDoneFuture = new KafkaFutureImpl<>();
+        private TestLoginContext testLoginContext;
+
+        public void configure(LoginContext mockLoginContext,
+                TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin) throws LoginException {
+            // sanity check to make sure it is likely a mock
+            if (Objects.requireNonNull(mockLoginContext).getClass().equals(LoginContext.class)
+                    || mockLoginContext.getClass().equals(TestLoginContext.class))
+                throw new IllegalArgumentException();
+            this.testLoginContext = new TestLoginContext(Objects.requireNonNull(testExpiringCredentialRefreshingLogin),
+                    mockLoginContext);
+        }
+
+        @Override
+        public LoginContext createLoginContext(ExpiringCredentialRefreshingLogin expiringCredentialRefreshingLogin) {
+            return testLoginContext;
+        }
+
+        @Override
+        public void refresherThreadStarted() {
+            refresherThreadStartedFuture.complete(null);
+        }
+
+        @Override
+        public void refresherThreadDone() {
+            refresherThreadDoneFuture.complete(null);
+        }
+
+        public Future<?> refresherThreadStartedFuture() {
+            return refresherThreadStartedFuture;
+        }
+
+        public Future<?> refresherThreadDoneFuture() {
+            return refresherThreadDoneFuture;
+        }
+    }
+
+    @Test
+    public void testRefresh() throws Exception {
+        for (int numExpectedRefreshes : new int[] {0, 1, 2}) {
+            for (boolean clientReloginAllowedBeforeLogout : new boolean[] {true, false}) {
+                Subject subject = new Subject();
+                /*
+                 * Create a mock and record the fact that we expect login() to be invoked
+                 * followed by getSubject() and then ultimately followed by numExpectedRefreshes
+                 * pairs of either login()/logout() or logout()/login() calls
+                 */
+                final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
+                mockLoginContext.login();
+                EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
+                for (int i = 0; i < numExpectedRefreshes; ++i) {
+                    if (clientReloginAllowedBeforeLogout) {
+                        mockLoginContext.login();
+                        mockLoginContext.logout();
+                    } else {
+                        mockLoginContext.logout();
+                        mockLoginContext.login();
+                    }
+                }
+                EasyMock.replay(mockLoginContext);
+
+                MockTime mockTime = new MockTime();
+                long startMs = mockTime.milliseconds();
+                /*
+                 * Identify the lifetime of each expiring credential
+                 */
+                long lifetimeMinutes = 100L;
+                /*
+                 * Identify the point at which refresh will occur in that lifetime
+                 */
+                long refreshEveryMinutes = 80L;
+                /*
+                 * Set an absolute last refresh time that will cause the login thread to exit
+                 * after a certain number of re-logins (by adding an extra half of a refresh
+                 * interval).
+                 */
+                long absoluteLastRefreshMs = startMs + (1 + numExpectedRefreshes) * 1000 * 60 * refreshEveryMinutes
+                        - 1000 * 60 * refreshEveryMinutes / 2;
+                /*
+                 * Identify buffer time on either side for the refresh algorithm
+                 */
+                short minPeriodSeconds = (short) 0;
+                short bufferSeconds = minPeriodSeconds;
+
+                /*
+                 * Define some listeners so we can keep track of who gets done and when. All
+                 * added listeners should end up done except the last, extra one, which should
+                 * not.
+                 */
+                MockScheduler mockScheduler = new MockScheduler(mockTime);
+                List<KafkaFutureImpl<Long>> waiters = addWaiters(mockScheduler, 1000 * 60 * refreshEveryMinutes,
+                        numExpectedRefreshes + 1);
+
+                // Create the ExpiringCredentialRefreshingLogin instance under test
+                TestLoginContextFactory testLoginContextFactory = new TestLoginContextFactory();
+                TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin = new TestExpiringCredentialRefreshingLogin(
+                        refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime(
+                                1.0 * refreshEveryMinutes / lifetimeMinutes, minPeriodSeconds, bufferSeconds,
+                                clientReloginAllowedBeforeLogout),
+                        testLoginContextFactory, mockTime, 1000 * 60 * lifetimeMinutes, absoluteLastRefreshMs,
+                        clientReloginAllowedBeforeLogout);
+                testLoginContextFactory.configure(mockLoginContext, testExpiringCredentialRefreshingLogin);
+
+                /*
+                 * Perform the login, wait up to a certain amount of time for the refresher
+                 * thread to exit, and make sure the correct calls happened at the correct times
+                 */
+                long expectedFinalMs = startMs + numExpectedRefreshes * 1000 * 60 * refreshEveryMinutes;
+                assertFalse(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+                assertFalse(testLoginContextFactory.refresherThreadDoneFuture().isDone());
+                testExpiringCredentialRefreshingLogin.login();
+                assertTrue(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+                testLoginContextFactory.refresherThreadDoneFuture().get(1L, TimeUnit.SECONDS);
+                assertEquals(expectedFinalMs, mockTime.milliseconds());
+                for (int i = 0; i < numExpectedRefreshes; ++i) {
+                    KafkaFutureImpl<Long> waiter = waiters.get(i);
+                    assertTrue(waiter.isDone());
+                    assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, waiter.get().longValue() - startMs);
+                }
+                assertFalse(waiters.get(numExpectedRefreshes).isDone());
+            }
+        }
+    }
+
+    @Test
+    public void testRefreshWithExpirationSmallerThanConfiguredBuffers() throws Exception {
+        int numExpectedRefreshes = 1;
+        boolean clientReloginAllowedBeforeLogout = true;
+        Subject subject = new Subject();
+        /*
+         * Create a mock and record the fact that we expect login() to be invoked
+         * followed by getSubject() and then ultimately followed by numExpectedRefreshes
+         * pairs of login()/logout() calls
+         */
+        final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
+        mockLoginContext.login();
+        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            mockLoginContext.login();
+            mockLoginContext.logout();
+        }
+        EasyMock.replay(mockLoginContext);
+
+        MockTime mockTime = new MockTime();
+        long startMs = mockTime.milliseconds();
+        /*
+         * Identify the lifetime of each expiring credential
+         */
+        long lifetimeMinutes = 10L;
+        /*
+         * Identify the point at which refresh will occur in that lifetime
+         */
+        long refreshEveryMinutes = 8L;
+        /*
+         * Set an absolute last refresh time that will cause the login thread to exit
+         * after a certain number of re-logins (by adding an extra half of a refresh
+         * interval).
+         */
+        long absoluteLastRefreshMs = startMs + (1 + numExpectedRefreshes) * 1000 * 60 * refreshEveryMinutes
+                - 1000 * 60 * refreshEveryMinutes / 2;
+        /*
+         * Identify buffer time on either side for the refresh algorithm that will cause
+         * the entire lifetime to be taken up. In other words, make sure there is no way
+         * to honor the buffers.
+         */
+        short minPeriodSeconds = (short) (1 + lifetimeMinutes * 60 / 2);
+        short bufferSeconds = minPeriodSeconds;
+
+        /*
+         * Define some listeners so we can keep track of who gets done and when. All
+         * added listeners should end up done except the last, extra one, which should
+         * not.
+         */
+        MockScheduler mockScheduler = new MockScheduler(mockTime);
+        List<KafkaFutureImpl<Long>> waiters = addWaiters(mockScheduler, 1000 * 60 * refreshEveryMinutes,
+                numExpectedRefreshes + 1);
+
+        // Create the ExpiringCredentialRefreshingLogin instance under test
+        TestLoginContextFactory testLoginContextFactory = new TestLoginContextFactory();
+        TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin = new TestExpiringCredentialRefreshingLogin(
+                refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime(
+                        1.0 * refreshEveryMinutes / lifetimeMinutes, minPeriodSeconds, bufferSeconds,
+                        clientReloginAllowedBeforeLogout),
+                testLoginContextFactory, mockTime, 1000 * 60 * lifetimeMinutes, absoluteLastRefreshMs,
+                clientReloginAllowedBeforeLogout);
+        testLoginContextFactory.configure(mockLoginContext, testExpiringCredentialRefreshingLogin);
+
+        /*
+         * Perform the login, wait up to a certain amount of time for the refresher
+         * thread to exit, and make sure the correct calls happened at the correct times
+         */
+        long expectedFinalMs = startMs + numExpectedRefreshes * 1000 * 60 * refreshEveryMinutes;
+        assertFalse(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        assertFalse(testLoginContextFactory.refresherThreadDoneFuture().isDone());
+        testExpiringCredentialRefreshingLogin.login();
+        assertTrue(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        testLoginContextFactory.refresherThreadDoneFuture().get(1L, TimeUnit.SECONDS);
+        assertEquals(expectedFinalMs, mockTime.milliseconds());
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            KafkaFutureImpl<Long> waiter = waiters.get(i);
+            assertTrue(waiter.isDone());
+            assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, waiter.get().longValue() - startMs);
+        }
+        assertFalse(waiters.get(numExpectedRefreshes).isDone());
+    }
+
+    @Test
+    public void testRefreshWithMinPeriodIntrusion() throws Exception {
+        int numExpectedRefreshes = 1;
+        boolean clientReloginAllowedBeforeLogout = true;
+        Subject subject = new Subject();
+        /*
+         * Create a mock and record the fact that we expect login() to be invoked
+         * followed by getSubject() and then ultimately followed by numExpectedRefreshes
+         * pairs of login()/logout() calls
+         */
+        final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
+        mockLoginContext.login();
+        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            mockLoginContext.login();
+            mockLoginContext.logout();
+        }
+        EasyMock.replay(mockLoginContext);
+
+        MockTime mockTime = new MockTime();
+        long startMs = mockTime.milliseconds();
+        /*
+         * Identify the lifetime of each expiring credential
+         */
+        long lifetimeMinutes = 10L;
+        /*
+         * Identify the point at which refresh will occur in that lifetime
+         */
+        long refreshEveryMinutes = 8L;
+        /*
+         * Set an absolute last refresh time that will cause the login thread to exit
+         * after a certain number of re-logins (by adding an extra half of a refresh
+         * interval).
+         */
+        long absoluteLastRefreshMs = startMs + (1 + numExpectedRefreshes) * 1000 * 60 * refreshEveryMinutes
+                - 1000 * 60 * refreshEveryMinutes / 2;
+
+        /*
+         * Identify a minimum period that will cause the refresh time to be delayed a
+         * bit.
+         */
+        int bufferIntrusionSeconds = 1;
+        short minPeriodSeconds = (short) (refreshEveryMinutes * 60 + bufferIntrusionSeconds);
+        short bufferSeconds = (short) 0;
+
+        /*
+         * Define some listeners so we can keep track of who gets done and when. All
+         * added listeners should end up done except the last, extra one, which should
+         * not.
+         */
+        MockScheduler mockScheduler = new MockScheduler(mockTime);
+        List<KafkaFutureImpl<Long>> waiters = addWaiters(mockScheduler,
+                1000 * (60 * refreshEveryMinutes + bufferIntrusionSeconds), numExpectedRefreshes + 1);
+
+        // Create the ExpiringCredentialRefreshingLogin instance under test
+        TestLoginContextFactory testLoginContextFactory = new TestLoginContextFactory();
+        TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin = new TestExpiringCredentialRefreshingLogin(
+                refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime(
+                        1.0 * refreshEveryMinutes / lifetimeMinutes, minPeriodSeconds, bufferSeconds,
+                        clientReloginAllowedBeforeLogout),
+                testLoginContextFactory, mockTime, 1000 * 60 * lifetimeMinutes, absoluteLastRefreshMs,
+                clientReloginAllowedBeforeLogout);
+        testLoginContextFactory.configure(mockLoginContext, testExpiringCredentialRefreshingLogin);
+
+        /*
+         * Perform the login, wait up to a certain amount of time for the refresher
+         * thread to exit, and make sure the correct calls happened at the correct times
+         */
+        long expectedFinalMs = startMs
+                + numExpectedRefreshes * 1000 * (60 * refreshEveryMinutes + bufferIntrusionSeconds);
+        assertFalse(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        assertFalse(testLoginContextFactory.refresherThreadDoneFuture().isDone());
+        testExpiringCredentialRefreshingLogin.login();
+        assertTrue(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        testLoginContextFactory.refresherThreadDoneFuture().get(1L, TimeUnit.SECONDS);
+        assertEquals(expectedFinalMs, mockTime.milliseconds());
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            KafkaFutureImpl<Long> waiter = waiters.get(i);
+            assertTrue(waiter.isDone());
+            assertEquals((i + 1) * 1000 * (60 * refreshEveryMinutes + bufferIntrusionSeconds),
+                    waiter.get().longValue() - startMs);
+        }
+        assertFalse(waiters.get(numExpectedRefreshes).isDone());
+    }
+
+    @Test
+    public void testRefreshWithPreExpirationBufferIntrusion() throws Exception {
+        int numExpectedRefreshes = 1;
+        boolean clientReloginAllowedBeforeLogout = true;
+        Subject subject = new Subject();
+        /*
+         * Create a mock and record the fact that we expect login() to be invoked
+         * followed by getSubject() and then ultimately followed by numExpectedRefreshes
+         * pairs of login()/logout() calls
+         */
+        final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
+        mockLoginContext.login();
+        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            mockLoginContext.login();
+            mockLoginContext.logout();
+        }
+        EasyMock.replay(mockLoginContext);
+
+        MockTime mockTime = new MockTime();
+        long startMs = mockTime.milliseconds();
+        /*
+         * Identify the lifetime of each expiring credential
+         */
+        long lifetimeMinutes = 10L;
+        /*
+         * Identify the point at which refresh will occur in that lifetime
+         */
+        long refreshEveryMinutes = 8L;
+        /*
+         * Set an absolute last refresh time that will cause the login thread to exit
+         * after a certain number of re-logins (by adding an extra half of a refresh
+         * interval).
+         */
+        long absoluteLastRefreshMs = startMs + (1 + numExpectedRefreshes) * 1000 * 60 * refreshEveryMinutes
+                - 1000 * 60 * refreshEveryMinutes / 2;
+        /*
+         * Identify a minimum period that will cause the refresh time to be delayed a
+         * bit.
+         */
+        int bufferIntrusionSeconds = 1;
+        short bufferSeconds = (short) ((lifetimeMinutes - refreshEveryMinutes) * 60 + bufferIntrusionSeconds);
+        short minPeriodSeconds = (short) 0;
+
+        /*
+         * Define some listeners so we can keep track of who gets done and when. All
+         * added listeners should end up done except the last, extra one, which should
+         * not.
+         */
+        MockScheduler mockScheduler = new MockScheduler(mockTime);
+        List<KafkaFutureImpl<Long>> waiters = addWaiters(mockScheduler,
+                1000 * (60 * refreshEveryMinutes - bufferIntrusionSeconds), numExpectedRefreshes + 1);
+
+        // Create the ExpiringCredentialRefreshingLogin instance under test
+        TestLoginContextFactory testLoginContextFactory = new TestLoginContextFactory();
+        TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin = new TestExpiringCredentialRefreshingLogin(
+                refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime(
+                        1.0 * refreshEveryMinutes / lifetimeMinutes, minPeriodSeconds, bufferSeconds,
+                        clientReloginAllowedBeforeLogout),
+                testLoginContextFactory, mockTime, 1000 * 60 * lifetimeMinutes, absoluteLastRefreshMs,
+                clientReloginAllowedBeforeLogout);
+        testLoginContextFactory.configure(mockLoginContext, testExpiringCredentialRefreshingLogin);
+
+        /*
+         * Perform the login, wait up to a certain amount of time for the refresher
+         * thread to exit, and make sure the correct calls happened at the correct times
+         */
+        long expectedFinalMs = startMs
+                + numExpectedRefreshes * 1000 * (60 * refreshEveryMinutes - bufferIntrusionSeconds);
+        assertFalse(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        assertFalse(testLoginContextFactory.refresherThreadDoneFuture().isDone());
+        testExpiringCredentialRefreshingLogin.login();
+        assertTrue(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        testLoginContextFactory.refresherThreadDoneFuture().get(1L, TimeUnit.SECONDS);
+        assertEquals(expectedFinalMs, mockTime.milliseconds());
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            KafkaFutureImpl<Long> waiter = waiters.get(i);
+            assertTrue(waiter.isDone());
+            assertEquals((i + 1) * 1000 * (60 * refreshEveryMinutes - bufferIntrusionSeconds),
+                    waiter.get().longValue() - startMs);
+        }
+        assertFalse(waiters.get(numExpectedRefreshes).isDone());
+    }
+
+    private static List<KafkaFutureImpl<Long>> addWaiters(MockScheduler mockScheduler, long refreshEveryMillis,
+            int numWaiters) {
+        List<KafkaFutureImpl<Long>> retvalWaiters = new ArrayList<>(numWaiters);
+        for (int i = 1; i <= numWaiters; ++i) {
+            KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<Long>();
+            mockScheduler.addWaiter(i * refreshEveryMillis, waiter);
+            retvalWaiters.add(waiter);
+        }
+        return retvalWaiters;
+    }
+
+    private static ExpiringCredentialRefreshConfig refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime(
+            double refreshWindowFactor, short minPeriodSeconds, short bufferSeconds,
+            boolean clientReloginAllowedBeforeLogout) {
+        Map<Object, Object> configs = new HashMap<>();
+        configs.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, refreshWindowFactor);
+        configs.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, 0);
+        configs.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, minPeriodSeconds);
+        configs.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, bufferSeconds);
+        return new ExpiringCredentialRefreshConfig(new ConfigDef().withClientSaslSupport().parse(configs),
+                clientReloginAllowedBeforeLogout);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerScopeUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerScopeUtilsTest.java
new file mode 100644
index 0000000..087c0fc
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerScopeUtilsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.junit.Test;
+
+public class OAuthBearerScopeUtilsTest {
+    @Test
+    public void validScope() {
+        for (String validScope : new String[] {"", "   ", "scope1", " scope1 ", "scope1 Scope2", "scope1   Scope2"}) {
+            List<String> parsedScope = OAuthBearerScopeUtils.parseScope(validScope);
+            if (validScope.trim().isEmpty()) {
+                assertTrue(parsedScope.isEmpty());
+            } else if (validScope.contains("Scope2")) {
+                assertTrue(parsedScope.size() == 2 && parsedScope.get(0).equals("scope1")
+                        && parsedScope.get(1).equals("Scope2"));
+            } else {
+                assertTrue(parsedScope.size() == 1 && parsedScope.get(0).equals("scope1"));
+            }
+        }
+    }
+
+    @Test
+    public void invalidScope() {
+        for (String invalidScope : new String[] {"\"foo", "\\foo"}) {
+            try {
+                OAuthBearerScopeUtils.parseScope(invalidScope);
+                fail("did not detect invalid scope: " + invalidScope);
+            } catch (OAuthBearerConfigException expected) {
+                // empty
+            }
+        }
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredJwsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredJwsTest.java
new file mode 100644
index 0000000..5c4bf73
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredJwsTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Test;
+
+public class OAuthBearerUnsecuredJwsTest {
+    private static final String QUOTE = "\"";
+    private static final String HEADER_COMPACT_SERIALIZATION = Base64.getUrlEncoder().withoutPadding()
+            .encodeToString("{\"alg\":\"none\"}".getBytes(StandardCharsets.UTF_8)) + ".";
+
+    @Test
+    public void validClaims() throws OAuthBearerIllegalTokenException {
+        double issuedAtSeconds = 100.1;
+        double expirationTimeSeconds = 300.3;
+        StringBuilder sb = new StringBuilder("{");
+        appendJsonText(sb, "sub", "SUBJECT");
+        appendCommaJsonText(sb, "iat", issuedAtSeconds);
+        appendCommaJsonText(sb, "exp", expirationTimeSeconds);
+        sb.append("}");
+        String compactSerialization = HEADER_COMPACT_SERIALIZATION
+                + Base64.getUrlEncoder().withoutPadding().encodeToString(sb.toString().getBytes(StandardCharsets.UTF_8))
+                + ".";
+        OAuthBearerUnsecuredJws testJwt = new OAuthBearerUnsecuredJws(compactSerialization, "sub", "scope");
+        assertEquals(compactSerialization, testJwt.value());
+        assertEquals("sub", testJwt.principalClaimName());
+        assertEquals(1, testJwt.header().size());
+        assertEquals("none", testJwt.header().get("alg"));
+        assertEquals("scope", testJwt.scopeClaimName());
+        assertEquals(expirationTimeSeconds, testJwt.expirationTime());
+        assertTrue(testJwt.isClaimType("exp", Number.class));
+        assertEquals(issuedAtSeconds, testJwt.issuedAt());
+        assertEquals("SUBJECT", testJwt.subject());
+    }
+
+    @Test
+    public void validCompactSerialization() {
+        String subject = "foo";
+        long issuedAt = 100;
+        long expirationTime = issuedAt + 60 * 60;
+        List<String> scope = Arrays.asList("scopeValue1", "scopeValue2");
+        String validCompactSerialization = compactSerialization(subject, issuedAt, expirationTime, scope);
+        OAuthBearerUnsecuredJws jws = new OAuthBearerUnsecuredJws(validCompactSerialization, "sub", "scope");
+        assertEquals(1, jws.header().size());
+        assertEquals("none", jws.header().get("alg"));
+        assertEquals(4, jws.claims().size());
+        assertEquals(subject, jws.claims().get("sub"));
+        assertEquals(subject, jws.principalName());
+        assertEquals(issuedAt, Number.class.cast(jws.claims().get("iat")).longValue());
+        assertEquals(expirationTime, Number.class.cast(jws.claims().get("exp")).longValue());
+        assertEquals(expirationTime * 1000, jws.lifetimeMs());
+        assertEquals(scope, jws.claims().get("scope"));
+        assertEquals(new HashSet<>(scope), jws.scope());
+        assertEquals(3, jws.splits().size());
+        assertEquals(validCompactSerialization.split("\\.")[0], jws.splits().get(0));
+        assertEquals(validCompactSerialization.split("\\.")[1], jws.splits().get(1));
+        assertEquals("", jws.splits().get(2));
+    }
+
+    @Test(expected = OAuthBearerIllegalTokenException.class)
+    public void missingPrincipal() {
+        String subject = null;
+        long issuedAt = 100;
+        Long expirationTime = null;
+        List<String> scope = Arrays.asList("scopeValue1", "scopeValue2");
+        String validCompactSerialization = compactSerialization(subject, issuedAt, expirationTime, scope);
+        new OAuthBearerUnsecuredJws(validCompactSerialization, "sub", "scope");
+    }
+
+    @Test(expected = OAuthBearerIllegalTokenException.class)
+    public void blankPrincipalName() {
+        String subject = "   ";
+        long issuedAt = 100;
+        long expirationTime = issuedAt + 60 * 60;
+        List<String> scope = Arrays.asList("scopeValue1", "scopeValue2");
+        String validCompactSerialization = compactSerialization(subject, issuedAt, expirationTime, scope);
+        new OAuthBearerUnsecuredJws(validCompactSerialization, "sub", "scope");
+    }
+
+    private static String compactSerialization(String subject, Long issuedAt, Long expirationTime, List<String> scope) {
+        Encoder encoder = Base64.getUrlEncoder().withoutPadding();
+        String algorithm = "none";
+        String headerJson = "{\"alg\":\"" + algorithm + "\"}";
+        String encodedHeader = encoder.encodeToString(headerJson.getBytes(StandardCharsets.UTF_8));
+        String subjectJson = subject != null ? "\"sub\":\"" + subject + "\"" : null;
+        String issuedAtJson = issuedAt != null ? "\"iat\":" + issuedAt.longValue() : null;
+        String expirationTimeJson = expirationTime != null ? "\"exp\":" + expirationTime.longValue() : null;
+        String scopeJson = scope != null ? scopeJson(scope) : null;
+        String claimsJson = claimsJson(subjectJson, issuedAtJson, expirationTimeJson, scopeJson);
+        String encodedClaims = encoder.encodeToString(claimsJson.getBytes(StandardCharsets.UTF_8));
+        return encodedHeader + "." + encodedClaims + ".";
+    }
+
+    private static String claimsJson(String... jsonValues) {
+        StringBuilder claimsJsonBuilder = new StringBuilder("{");
+        int initialLength = claimsJsonBuilder.length();
+        for (String jsonValue : jsonValues) {
+            if (jsonValue != null) {
+                if (claimsJsonBuilder.length() > initialLength)
+                    claimsJsonBuilder.append(',');
+                claimsJsonBuilder.append(jsonValue);
+            }
+        }
+        claimsJsonBuilder.append('}');
+        return claimsJsonBuilder.toString();
+    }
+
+    private static String scopeJson(List<String> scope) {
+        StringBuilder scopeJsonBuilder = new StringBuilder("\"scope\":[");
+        int initialLength = scopeJsonBuilder.length();
+        for (String scopeValue : scope) {
+            if (scopeJsonBuilder.length() > initialLength)
+                scopeJsonBuilder.append(',');
+            scopeJsonBuilder.append('"').append(scopeValue).append('"');
+        }
+        scopeJsonBuilder.append(']');
+        return scopeJsonBuilder.toString();
+    }
+
+    private static void appendCommaJsonText(StringBuilder sb, String claimName, Number claimValue) {
+        sb.append(',').append(QUOTE).append(escape(claimName)).append(QUOTE).append(":").append(claimValue);
+    }
+
+    private static void appendJsonText(StringBuilder sb, String claimName, String claimValue) {
+        sb.append(QUOTE).append(escape(claimName)).append(QUOTE).append(":").append(QUOTE).append(escape(claimValue))
+                .append(QUOTE);
+    }
+
+    private static String escape(String jsonStringValue) {
+        return jsonStringValue.replace("\"", "\\\"").replace("\\", "\\\\");
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
new file mode 100644
index 0000000..08dc1f5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+
+import org.apache.kafka.common.security.authenticator.TestJaasConfig;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Test;
+
+public class OAuthBearerUnsecuredLoginCallbackHandlerTest {
+    @Test
+    public void minimalToken() throws IOException, UnsupportedCallbackException {
+        Map<String, String> options = new HashMap<>();
+        String user = "user";
+        options.put("unsecuredLoginStringClaim_sub", user);
+        MockTime mockTime = new MockTime();
+        OAuthBearerUnsecuredLoginCallbackHandler callbackHandler = createCallbackHandler(options, mockTime);
+        OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+        callbackHandler.handle(new Callback[] {callback});
+        OAuthBearerUnsecuredJws jws = (OAuthBearerUnsecuredJws) callback.token();
+        assertNotNull("create token failed", jws);
+        long startMs = mockTime.milliseconds();
+        confirmCorrectValues(jws, user, startMs, 1000 * 60 * 60);
+        assertEquals(new HashSet<>(Arrays.asList("sub", "iat", "exp")), jws.claims().keySet());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validOptionsWithExplicitOptionValues()
+            throws IOException, UnsupportedCallbackException, LoginException {
+        String explicitScope1 = "scope1";
+        String explicitScope2 = "scope2";
+        String explicitScopeClaimName = "putScopeInHere";
+        String principalClaimName = "principal";
+        final String[] scopeClaimNameOptionValues = {null, explicitScopeClaimName};
+        for (String scopeClaimNameOptionValue : scopeClaimNameOptionValues) {
+            Map<String, String> options = new HashMap<>();
+            String user = "user";
+            options.put("unsecuredLoginStringClaim_" + principalClaimName, user);
+            options.put("unsecuredLoginListClaim_" + "list", ",1,2,");
+            options.put("unsecuredLoginListClaim_" + "emptyList1", "");
+            options.put("unsecuredLoginListClaim_" + "emptyList2", ",");
+            options.put("unsecuredLoginNumberClaim_" + "number", "1");
+            long lifetmeSeconds = 10000;
+            options.put("unsecuredLoginLifetimeSeconds", String.valueOf(lifetmeSeconds));
+            options.put("unsecuredLoginPrincipalClaimName", principalClaimName);
+            if (scopeClaimNameOptionValue != null)
+                options.put("unsecuredLoginScopeClaimName", scopeClaimNameOptionValue);
+            String actualScopeClaimName = scopeClaimNameOptionValue == null ? "scope" : explicitScopeClaimName;
+            options.put("unsecuredLoginListClaim_" + actualScopeClaimName,
+                    String.format("|%s|%s", explicitScope1, explicitScope2));
+            MockTime mockTime = new MockTime();
+            OAuthBearerUnsecuredLoginCallbackHandler callbackHandler = createCallbackHandler(options, mockTime);
+            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+            callbackHandler.handle(new Callback[] {callback});
+            OAuthBearerUnsecuredJws jws = (OAuthBearerUnsecuredJws) callback.token();
+            assertNotNull("create token failed", jws);
+            long startMs = mockTime.milliseconds();
+            confirmCorrectValues(jws, user, startMs, lifetmeSeconds * 1000);
+            Map<String, Object> claims = jws.claims();
+            assertEquals(new HashSet<>(Arrays.asList(actualScopeClaimName, principalClaimName, "iat", "exp", "number",
+                    "list", "emptyList1", "emptyList2")), claims.keySet());
+            assertEquals(new HashSet<>(Arrays.asList(explicitScope1, explicitScope2)),
+                    new HashSet<>((List<String>) claims.get(actualScopeClaimName)));
+            assertEquals(new HashSet<>(Arrays.asList(explicitScope1, explicitScope2)), jws.scope());
+            assertEquals(1.0, jws.claim("number", Number.class));
+            assertEquals(Arrays.asList("1", "2", ""), jws.claim("list", List.class));
+            assertEquals(Collections.emptyList(), jws.claim("emptyList1", List.class));
+            assertEquals(Collections.emptyList(), jws.claim("emptyList2", List.class));
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static OAuthBearerUnsecuredLoginCallbackHandler createCallbackHandler(Map<String, String> options,
+            MockTime mockTime) {
+        TestJaasConfig config = new TestJaasConfig();
+        config.createOrUpdateEntry("KafkaClient", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule",
+                (Map) options);
+        OAuthBearerUnsecuredLoginCallbackHandler callbackHandler = new OAuthBearerUnsecuredLoginCallbackHandler();
+        callbackHandler.time(mockTime);
+        callbackHandler.configure(Collections.<String, Object>emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                Arrays.asList(config.getAppConfigurationEntry("KafkaClient")[0]));
+        return callbackHandler;
+    }
+
+    private static void confirmCorrectValues(OAuthBearerUnsecuredJws jws, String user, long startMs,
+            long lifetimeSeconds) throws OAuthBearerIllegalTokenException {
+        Map<String, Object> header = jws.header();
+        assertEquals(header.size(), 1);
+        assertEquals("none", header.get("alg"));
+        assertEquals(user != null ? user : "<unknown>", jws.principalName());
+        assertEquals(Long.valueOf(startMs), jws.startTimeMs());
+        assertEquals(startMs, Math.round(jws.issuedAt().doubleValue() * 1000));
+        assertEquals(startMs + lifetimeSeconds, jws.lifetimeMs());
+        assertEquals(jws.lifetimeMs(), Math.round(jws.expirationTime().doubleValue() * 1000));
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
new file mode 100644
index 0000000..bc059cf
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+
+import org.apache.kafka.common.security.authenticator.TestJaasConfig;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Test;
+
+public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
+    private static final String UNSECURED_JWT_HEADER_JSON = "{" + claimOrHeaderText("alg", "none") + "}";
+    private static final Time MOCK_TIME = new MockTime();
+    private static final String QUOTE = "\"";
+    private static final String PRINCIPAL_CLAIM_VALUE = "username";
+    private static final String PRINCIPAL_CLAIM_TEXT = claimOrHeaderText("principal", PRINCIPAL_CLAIM_VALUE);
+    private static final String SUB_CLAIM_TEXT = claimOrHeaderText("sub", PRINCIPAL_CLAIM_VALUE);
+    private static final String BAD_PRINCIPAL_CLAIM_TEXT = claimOrHeaderText("principal", 1);
+    private static final long LIFETIME_SECONDS_TO_USE = 1000 * 60 * 60;
+    private static final String EXPIRATION_TIME_CLAIM_TEXT = expClaimText(LIFETIME_SECONDS_TO_USE);
+    private static final String TOO_EARLY_EXPIRATION_TIME_CLAIM_TEXT = expClaimText(0);
+    private static final String ISSUED_AT_CLAIM_TEXT = claimOrHeaderText("iat", MOCK_TIME.milliseconds() / 1000.0);
+    private static final String SCOPE_CLAIM_TEXT = claimOrHeaderText("scope", "scope1");
+    private static final Map<String, String> MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED;
+    static {
+        Map<String, String> tmp = new HashMap<>();
+        tmp.put("unsecuredValidatorPrincipalClaimName", "principal");
+        tmp.put("unsecuredValidatorAllowableClockSkewMs", "1");
+        MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED = Collections.unmodifiableMap(tmp);
+    }
+    private static final Map<String, String> MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE;
+    static {
+        Map<String, String> tmp = new HashMap<>();
+        tmp.put("unsecuredValidatorRequiredScope", "scope1");
+        MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE = Collections.unmodifiableMap(tmp);
+    }
+    private static final Map<String, String> MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE;
+    static {
+        Map<String, String> tmp = new HashMap<>();
+        tmp.put("unsecuredValidatorRequiredScope", "scope1 scope2");
+        MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE = Collections.unmodifiableMap(tmp);
+    }
+
+    @Test
+    public void validToken() throws IOException, UnsupportedCallbackException {
+        for (final boolean includeOptionalIssuedAtClaim : new boolean[] {true, false}) {
+            String claimsJson = "{" + PRINCIPAL_CLAIM_TEXT + comma(EXPIRATION_TIME_CLAIM_TEXT)
+                    + (includeOptionalIssuedAtClaim ? comma(ISSUED_AT_CLAIM_TEXT) : "") + "}";
+            Object validationResult = validationResult(UNSECURED_JWT_HEADER_JSON, claimsJson,
+                    MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED);
+            assertTrue(validationResult instanceof OAuthBearerValidatorCallback);
+            assertTrue(((OAuthBearerValidatorCallback) validationResult).token() instanceof OAuthBearerUnsecuredJws);
+        }
+    }
+
+    @Test
+    public void badOrMissingPrincipal() throws IOException, UnsupportedCallbackException {
+        for (boolean exists : new boolean[] {true, false}) {
+            String claimsJson = "{" + EXPIRATION_TIME_CLAIM_TEXT + (exists ? comma(BAD_PRINCIPAL_CLAIM_TEXT) : "")
+                    + "}";
+            confirmFailsValidation(UNSECURED_JWT_HEADER_JSON, claimsJson, MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED);
+        }
+    }
+
+    @Test
+    public void tooEarlyExpirationTime() throws IOException, UnsupportedCallbackException {
+        String claimsJson = "{" + PRINCIPAL_CLAIM_TEXT + comma(ISSUED_AT_CLAIM_TEXT)
+                + comma(TOO_EARLY_EXPIRATION_TIME_CLAIM_TEXT) + "}";
+        confirmFailsValidation(UNSECURED_JWT_HEADER_JSON, claimsJson, MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED);
+    }
+
+    @Test
+    public void includesRequiredScope() throws IOException, UnsupportedCallbackException {
+        String claimsJson = "{" + SUB_CLAIM_TEXT + comma(EXPIRATION_TIME_CLAIM_TEXT) + comma(SCOPE_CLAIM_TEXT) + "}";
+        Object validationResult = validationResult(UNSECURED_JWT_HEADER_JSON, claimsJson,
+                MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE);
+        assertTrue(validationResult instanceof OAuthBearerValidatorCallback);
+        assertTrue(((OAuthBearerValidatorCallback) validationResult).token() instanceof OAuthBearerUnsecuredJws);
+    }
+
+    @Test
+    public void missingRequiredScope() throws IOException, UnsupportedCallbackException {
+        String claimsJson = "{" + SUB_CLAIM_TEXT + comma(EXPIRATION_TIME_CLAIM_TEXT) + comma(SCOPE_CLAIM_TEXT) + "}";
+        confirmFailsValidation(UNSECURED_JWT_HEADER_JSON, claimsJson, MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE,
+                "[scope1, scope2]");
+    }
+
+    private static void confirmFailsValidation(String headerJson, String claimsJson,
+            Map<String, String> moduleOptionsMap) throws OAuthBearerConfigException, OAuthBearerIllegalTokenException,
+            IOException, UnsupportedCallbackException {
+        confirmFailsValidation(headerJson, claimsJson, moduleOptionsMap, null);
+    }
+
+    private static void confirmFailsValidation(String headerJson, String claimsJson,
+            Map<String, String> moduleOptionsMap, String optionalFailureScope) throws OAuthBearerConfigException,
+            OAuthBearerIllegalTokenException, IOException, UnsupportedCallbackException {
+        Object validationResultObj = validationResult(headerJson, claimsJson, moduleOptionsMap);
+        assertTrue(validationResultObj instanceof OAuthBearerValidatorCallback);
+        OAuthBearerValidatorCallback callback = (OAuthBearerValidatorCallback) validationResultObj;
+        assertNull(callback.token());
+        assertNull(callback.errorOpenIDConfiguration());
+        if (optionalFailureScope == null) {
+            assertEquals("invalid_token", callback.errorStatus());
+            assertNull(callback.errorScope());
+        } else {
+            assertEquals("insufficient_scope", callback.errorStatus());
+            assertEquals(optionalFailureScope, callback.errorScope());
+        }
+    }
+
+    private static Object validationResult(String headerJson, String claimsJson, Map<String, String> moduleOptionsMap) {
+        Encoder urlEncoderNoPadding = Base64.getUrlEncoder().withoutPadding();
+        try {
+            String tokenValue = String.format("%s.%s.",
+                    urlEncoderNoPadding.encodeToString(headerJson.getBytes(StandardCharsets.UTF_8)),
+                    urlEncoderNoPadding.encodeToString(claimsJson.getBytes(StandardCharsets.UTF_8)));
+            OAuthBearerValidatorCallback callback = new OAuthBearerValidatorCallback(tokenValue);
+            createCallbackHandler(moduleOptionsMap).handle(new Callback[] {callback});
+            return callback;
+        } catch (Exception e) {
+            return e;
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static OAuthBearerUnsecuredValidatorCallbackHandler createCallbackHandler(Map<String, String> options) {
+        TestJaasConfig config = new TestJaasConfig();
+        config.createOrUpdateEntry("KafkaClient", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule",
+                (Map) options);
+        OAuthBearerUnsecuredValidatorCallbackHandler callbackHandler = new OAuthBearerUnsecuredValidatorCallbackHandler();
+        callbackHandler.configure(Collections.<String, Object>emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                Arrays.asList(config.getAppConfigurationEntry("KafkaClient")[0]));
+        return callbackHandler;
+    }
+
+    private static String comma(String value) {
+        return "," + value;
+    }
+
+    private static String claimOrHeaderText(String claimName, Number claimValue) {
+        return QUOTE + claimName + QUOTE + ":" + claimValue;
+    }
+
+    private static String claimOrHeaderText(String claimName, String claimValue) {
+        return QUOTE + claimName + QUOTE + ":" + QUOTE + claimValue + QUOTE;
+    }
+
+    private static String expClaimText(long lifetimeSeconds) {
+        return claimOrHeaderText("exp", MOCK_TIME.milliseconds() / 1000.0 + lifetimeSeconds);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationUtilsTest.java
new file mode 100644
index 0000000..f89572c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internal/unsecured/OAuthBearerValidationUtilsTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internal.unsecured;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.utils.Time;
+import org.junit.Test;
+
+public class OAuthBearerValidationUtilsTest {
+    private static final String QUOTE = "\"";
+    private static final String HEADER_COMPACT_SERIALIZATION = Base64.getUrlEncoder().withoutPadding()
+            .encodeToString("{\"alg\":\"none\"}".getBytes(StandardCharsets.UTF_8)) + ".";
+    private static final Time TIME = Time.SYSTEM;
+
+    @Test
+    public void validateClaimForExistenceAndType() throws OAuthBearerIllegalTokenException {
+        String claimName = "foo";
+        for (Boolean exists : new Boolean[] {null, Boolean.TRUE, Boolean.FALSE}) {
+            boolean useErrorValue = exists == null;
+            for (Boolean required : new boolean[] {true, false}) {
+                StringBuilder sb = new StringBuilder("{");
+                appendJsonText(sb, "exp", 100);
+                appendCommaJsonText(sb, "sub", "principalName");
+                if (useErrorValue)
+                    appendCommaJsonText(sb, claimName, 1);
+                else if (exists != null && exists.booleanValue())
+                    appendCommaJsonText(sb, claimName, claimName);
+                sb.append("}");
+                String compactSerialization = HEADER_COMPACT_SERIALIZATION + Base64.getUrlEncoder().withoutPadding()
+                        .encodeToString(sb.toString().getBytes(StandardCharsets.UTF_8)) + ".";
+                OAuthBearerUnsecuredJws testJwt = new OAuthBearerUnsecuredJws(compactSerialization, "sub", "scope");
+                OAuthBearerValidationResult result = OAuthBearerValidationUtils
+                        .validateClaimForExistenceAndType(testJwt, required, claimName, String.class);
+                if (useErrorValue || required && !exists.booleanValue())
+                    assertTrue(isFailureWithMessageAndNoFailureScope(result));
+                else
+                    assertTrue(isSuccess(result));
+            }
+        }
+    }
+
+    @Test
+    public void validateIssuedAt() {
+        long nowMs = TIME.milliseconds();
+        double nowClaimValue = ((double) nowMs) / 1000;
+        for (boolean exists : new boolean[] {true, false}) {
+            StringBuilder sb = new StringBuilder("{");
+            appendJsonText(sb, "exp", nowClaimValue);
+            appendCommaJsonText(sb, "sub", "principalName");
+            if (exists)
+                appendCommaJsonText(sb, "iat", nowClaimValue);
+            sb.append("}");
+            String compactSerialization = HEADER_COMPACT_SERIALIZATION + Base64.getUrlEncoder().withoutPadding()
+                    .encodeToString(sb.toString().getBytes(StandardCharsets.UTF_8)) + ".";
+            OAuthBearerUnsecuredJws testJwt = new OAuthBearerUnsecuredJws(compactSerialization, "sub", "scope");
+            for (boolean required : new boolean[] {true, false}) {
+                for (int allowableClockSkewMs : new int[] {0, 5, 10, 20}) {
+                    for (long whenCheckOffsetMs : new long[] {-10, 0, 10}) {
+                        long whenCheckMs = nowMs + whenCheckOffsetMs;
+                        OAuthBearerValidationResult result = OAuthBearerValidationUtils.validateIssuedAt(testJwt,
+                                required, whenCheckMs, allowableClockSkewMs);
+                        if (required && !exists)
+                            assertTrue("useErrorValue || required && !exists",
+                                    isFailureWithMessageAndNoFailureScope(result));
+                        else if (!required && !exists)
+                            assertTrue("!required && !exists", isSuccess(result));
+                        else if (nowClaimValue * 1000 > whenCheckMs + allowableClockSkewMs) // issued in future
+                            assertTrue(assertionFailureMessage(nowClaimValue, allowableClockSkewMs, whenCheckMs),
+                                    isFailureWithMessageAndNoFailureScope(result));
+                        else
+                            assertTrue(assertionFailureMessage(nowClaimValue, allowableClockSkewMs, whenCheckMs),
+                                    isSuccess(result));
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void validateExpirationTime() {
+        long nowMs = TIME.milliseconds();
+        double nowClaimValue = ((double) nowMs) / 1000;
+        StringBuilder sb = new StringBuilder("{");
+        appendJsonText(sb, "exp", nowClaimValue);
+        appendCommaJsonText(sb, "sub", "principalName");
+        sb.append("}");
+        String compactSerialization = HEADER_COMPACT_SERIALIZATION
+                + Base64.getUrlEncoder().withoutPadding().encodeToString(sb.toString().getBytes(StandardCharsets.UTF_8))
+                + ".";
+        OAuthBearerUnsecuredJws testJwt = new OAuthBearerUnsecuredJws(compactSerialization, "sub", "scope");
+        for (int allowableClockSkewMs : new int[] {0, 5, 10, 20}) {
+            for (long whenCheckOffsetMs : new long[] {-10, 0, 10}) {
+                long whenCheckMs = nowMs + whenCheckOffsetMs;
+                OAuthBearerValidationResult result = OAuthBearerValidationUtils.validateExpirationTime(testJwt,
+                        whenCheckMs, allowableClockSkewMs);
+                if (whenCheckMs - allowableClockSkewMs >= nowClaimValue * 1000) // expired
+                    assertTrue(assertionFailureMessage(nowClaimValue, allowableClockSkewMs, whenCheckMs),
+                            isFailureWithMessageAndNoFailureScope(result));
+                else
+                    assertTrue(assertionFailureMessage(nowClaimValue, allowableClockSkewMs, whenCheckMs),
+                            isSuccess(result));
+            }
+        }
+    }
+
+    @Test
+    public void validateExpirationTimeAndIssuedAtConsistency() throws OAuthBearerIllegalTokenException {
+        long nowMs = TIME.milliseconds();
+        double nowClaimValue = ((double) nowMs) / 1000;
+        for (boolean issuedAtExists : new boolean[] {true, false}) {
+            if (!issuedAtExists) {
+                StringBuilder sb = new StringBuilder("{");
+                appendJsonText(sb, "exp", nowClaimValue);
+                appendCommaJsonText(sb, "sub", "principalName");
+                sb.append("}");
+                String compactSerialization = HEADER_COMPACT_SERIALIZATION + Base64.getUrlEncoder().withoutPadding()
+                        .encodeToString(sb.toString().getBytes(StandardCharsets.UTF_8)) + ".";
+                OAuthBearerUnsecuredJws testJwt = new OAuthBearerUnsecuredJws(compactSerialization, "sub", "scope");
+                assertTrue(isSuccess(OAuthBearerValidationUtils.validateTimeConsistency(testJwt)));
+            } else
+                for (int expirationTimeOffset = -1; expirationTimeOffset <= 1; ++expirationTimeOffset) {
+                    StringBuilder sb = new StringBuilder("{");
+                    appendJsonText(sb, "iat", nowClaimValue);
+                    appendCommaJsonText(sb, "exp", nowClaimValue + expirationTimeOffset);
+                    appendCommaJsonText(sb, "sub", "principalName");
+                    sb.append("}");
+                    String compactSerialization = HEADER_COMPACT_SERIALIZATION + Base64.getUrlEncoder().withoutPadding()
+                            .encodeToString(sb.toString().getBytes(StandardCharsets.UTF_8)) + ".";
+                    OAuthBearerUnsecuredJws testJwt = new OAuthBearerUnsecuredJws(compactSerialization, "sub", "scope");
+                    OAuthBearerValidationResult result = OAuthBearerValidationUtils.validateTimeConsistency(testJwt);
+                    if (expirationTimeOffset <= 0)
+                        assertTrue(isFailureWithMessageAndNoFailureScope(result));
+                    else
+                        assertTrue(isSuccess(result));
+                }
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Test
+    public void validateScope() throws IOException {
+        long nowMs = TIME.milliseconds();
+        double nowClaimValue = ((double) nowMs) / 1000;
+        final List<String> noScope = Collections.emptyList();
+        final List<String> scope1 = Arrays.asList("scope1");
+        final List<String> scope1And2 = Arrays.asList("scope1", "scope2");
+        for (boolean actualScopeExists : new boolean[] {true, false}) {
+            List<? extends List> scopes = !actualScopeExists ? Arrays.<List>asList((List) null)
+                    : Arrays.asList(noScope, scope1, scope1And2);
+            for (List<String> actualScope : scopes) {
+                for (boolean requiredScopeExists : new boolean[] {true, false}) {
+                    List<? extends List> requiredScopes = !requiredScopeExists ? Arrays.<List>asList((List) null)
+                            : Arrays.asList(noScope, scope1, scope1And2);
+                    for (List<String> requiredScope : requiredScopes) {
+                        StringBuilder sb = new StringBuilder("{");
+                        appendJsonText(sb, "exp", nowClaimValue);
+                        appendCommaJsonText(sb, "sub", "principalName");
+                        if (actualScope != null)
+                            sb.append(',').append(scopeJson(actualScope));
+                        sb.append("}");
+                        String compactSerialization = HEADER_COMPACT_SERIALIZATION + Base64.getUrlEncoder()
+                                .withoutPadding().encodeToString(sb.toString().getBytes(StandardCharsets.UTF_8)) + ".";
+                        OAuthBearerUnsecuredJws testJwt = new OAuthBearerUnsecuredJws(compactSerialization, "sub",
+                                "scope");
+                        OAuthBearerValidationResult result = OAuthBearerValidationUtils.validateScope(testJwt,
+                                requiredScope);
+                        if (!requiredScopeExists || requiredScope.isEmpty())
+                            assertTrue(isSuccess(result));
+                        else if (!actualScopeExists || actualScope.size() < requiredScope.size())
+                            assertTrue(isFailureWithMessageAndFailureScope(result));
+                        else
+                            assertTrue(isSuccess(result));
+                    }
+                }
+            }
+        }
+    }
+
+    private static String assertionFailureMessage(double claimValue, int allowableClockSkewMs, long whenCheckMs) {
+        return String.format("time=%f seconds, whenCheck = %d ms, allowableClockSkew=%d ms", claimValue, whenCheckMs,
+                allowableClockSkewMs);
+    }
+
+    private static boolean isSuccess(OAuthBearerValidationResult result) {
+        return result.success();
+    }
+
+    private static boolean isFailureWithMessageAndNoFailureScope(OAuthBearerValidationResult result) {
+        return !result.success() && !result.failureDescription().isEmpty() && result.failureScope() == null
+                && result.failureOpenIdConfig() == null;
+    }
+
+    private static boolean isFailureWithMessageAndFailureScope(OAuthBearerValidationResult result) {
+        return !result.success() && !result.failureDescription().isEmpty() && !result.failureScope().isEmpty()
+                && result.failureOpenIdConfig() == null;
+    }
+
+    private static void appendCommaJsonText(StringBuilder sb, String claimName, Number claimValue) {
+        sb.append(',').append(QUOTE).append(escape(claimName)).append(QUOTE).append(":").append(claimValue);
+    }
+
+    private static void appendCommaJsonText(StringBuilder sb, String claimName, String claimValue) {
+        sb.append(',').append(QUOTE).append(escape(claimName)).append(QUOTE).append(":").append(QUOTE)
+                .append(escape(claimValue)).append(QUOTE);
+    }
+
+    private static void appendJsonText(StringBuilder sb, String claimName, Number claimValue) {
+        sb.append(QUOTE).append(escape(claimName)).append(QUOTE).append(":").append(claimValue);
+    }
+
+    private static String escape(String jsonStringValue) {
+        return jsonStringValue.replace("\"", "\\\"").replace("\\", "\\\\");
+    }
+
+    private static String scopeJson(List<String> scope) {
+        StringBuilder scopeJsonBuilder = new StringBuilder("\"scope\":[");
+        int initialLength = scopeJsonBuilder.length();
+        for (String scopeValue : scope) {
+            if (scopeJsonBuilder.length() > initialLength)
+                scopeJsonBuilder.append(',');
+            scopeJsonBuilder.append('"').append(scopeValue).append('"');
+        }
+        scopeJsonBuilder.append(']');
+        return scopeJsonBuilder.toString();
+    }
+}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 004b531..4225cdb 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -710,7 +710,11 @@ object DynamicListenerConfig {
     KafkaConfig.SaslKerberosTicketRenewWindowFactorProp,
     KafkaConfig.SaslKerberosTicketRenewJitterProp,
     KafkaConfig.SaslKerberosMinTimeBeforeReloginProp,
-    KafkaConfig.SaslKerberosPrincipalToLocalRulesProp
+    KafkaConfig.SaslKerberosPrincipalToLocalRulesProp,
+    KafkaConfig.SaslLoginRefreshWindowFactorProp,
+    KafkaConfig.SaslLoginRefreshWindowJitterProp,
+    KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp,
+    KafkaConfig.SaslLoginRefreshBufferSecondsProp
   )
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4069b8e..a78bb4d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -221,6 +221,10 @@ object Defaults {
   val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRules = BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
+  val SaslLoginRefreshWindowFactor = SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR
+  val SaslLoginRefreshWindowJitter = SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER
+  val SaslLoginRefreshMinPeriodSeconds = SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS
+  val SaslLoginRefreshBufferSeconds = SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS
 
   /** ********* Delegation Token configuration ***********/
   val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L
@@ -436,6 +440,10 @@ object KafkaConfig {
   val SaslKerberosTicketRenewJitterProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRulesProp = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG
+  val SaslLoginRefreshWindowFactorProp = SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR
+  val SaslLoginRefreshWindowJitterProp = SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER
+  val SaslLoginRefreshMinPeriodSecondsProp = SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS
+  val SaslLoginRefreshBufferSecondsProp = SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS
 
   /** ********* Delegation Token Configuration ****************/
   val DelegationTokenMasterKeyProp = "delegation.token.master.key"
@@ -734,6 +742,10 @@ object KafkaConfig {
   val SaslKerberosTicketRenewJitterDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC
   val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
   val SaslKerberosPrincipalToLocalRulesDoc = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
+  val SaslLoginRefreshWindowFactorDoc = SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
+  val SaslLoginRefreshWindowJitterDoc = SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
+  val SaslLoginRefreshMinPeriodSecondsDoc = SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
+  val SaslLoginRefreshBufferSecondsDoc = SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
 
   /** ********* Delegation Token Configuration ****************/
   val DelegationTokenMasterKeyDoc = "Master/secret key to generate and verify delegation tokens. Same key must be configured across all the brokers. " +
@@ -962,6 +974,10 @@ object KafkaConfig {
       .define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc)
       .define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc)
       .define(SaslKerberosPrincipalToLocalRulesProp, LIST, Defaults.SaslKerberosPrincipalToLocalRules, MEDIUM, SaslKerberosPrincipalToLocalRulesDoc)
+      .define(SaslLoginRefreshWindowFactorProp, DOUBLE, Defaults.SaslLoginRefreshWindowFactor, MEDIUM, SaslLoginRefreshWindowFactorDoc)
+      .define(SaslLoginRefreshWindowJitterProp, DOUBLE, Defaults.SaslLoginRefreshWindowJitter, MEDIUM, SaslLoginRefreshWindowJitterDoc)
+      .define(SaslLoginRefreshMinPeriodSecondsProp, SHORT, Defaults.SaslLoginRefreshMinPeriodSeconds, MEDIUM, SaslLoginRefreshMinPeriodSecondsDoc)
+      .define(SaslLoginRefreshBufferSecondsProp, SHORT, Defaults.SaslLoginRefreshBufferSeconds, MEDIUM, SaslLoginRefreshBufferSecondsDoc)
       /** ********* Delegation Token Configuration ****************/
       .define(DelegationTokenMasterKeyProp, PASSWORD, null, MEDIUM, DelegationTokenMasterKeyDoc)
       .define(DelegationTokenMaxLifeTimeProp, LONG, Defaults.DelegationTokenMaxLifeTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenMaxLifeTimeDoc)
diff --git a/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..ebdc0ac
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.JaasTestUtils
+
+class SaslOAuthBearerSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
+  override protected def kafkaClientSaslMechanism = "OAUTHBEARER"
+  override protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+  override val clientPrincipal = JaasTestUtils.KafkaOAuthBearerUser
+  override val kafkaPrincipal = JaasTestUtils.KafkaOAuthBearerAdmin
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6b86da6..afb297d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -703,6 +703,10 @@ class KafkaConfigTest {
         case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
         case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
         case KafkaConfig.SaslJaasConfigProp =>
+        case KafkaConfig.SaslLoginRefreshWindowFactorProp =>
+        case KafkaConfig.SaslLoginRefreshWindowJitterProp =>
+        case KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp =>
+        case KafkaConfig.SaslLoginRefreshBufferSecondsProp =>
 
         // Password encoder configs
         case KafkaConfig.PasswordEncoderSecretProp =>
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 10c7345..e8d9c30 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -83,6 +83,17 @@ object JaasTestUtils {
     ) ++ tokenProps.map { case (name, value) => name -> value }
   }
 
+  case class OAuthBearerLoginModule(username: String,
+                              debug: Boolean = false) extends JaasModule {
+
+    def name = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"
+
+    def entries: Map[String, String] = Map(
+      "unsecuredLoginStringClaim_sub" -> username
+    )
+
+  }
+
   sealed trait JaasModule {
     def name: String
     def debug: Boolean
@@ -134,6 +145,10 @@ object JaasTestUtils {
   val KafkaScramAdmin = "scram-admin"
   val KafkaScramAdminPassword = "scram-admin-secret"
 
+  val KafkaOAuthBearerUser = "oauthbearer-user"
+  val KafkaOAuthBearerUser2 = "oauthbearer-user2"
+  val KafkaOAuthBearerAdmin = "oauthbearer-admin"
+
   val serviceName = "kafka"
 
   def saslConfigs(saslProperties: Option[Properties]): Properties = {
@@ -156,7 +171,7 @@ object JaasTestUtils {
 
   // Returns the dynamic configuration, using credentials for user #1
   def clientLoginModule(mechanism: String, keytabLocation: Option[File]): String =
-    kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString
+    kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword, KafkaOAuthBearerUser).toString
 
   def tokenClientLoginModule(tokenId: String, password: String): String = {
     ScramLoginModule(
@@ -200,6 +215,8 @@ object JaasTestUtils {
           KafkaScramAdmin,
           KafkaScramAdminPassword,
           debug = false)
+      case "OAUTHBEARER" =>
+        OAuthBearerLoginModule(KafkaOAuthBearerAdmin)
       case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism)
     }
     JaasSection(contextName, modules)
@@ -209,7 +226,7 @@ object JaasTestUtils {
   private def kafkaClientModule(mechanism: String, 
       keytabLocation: Option[File], clientPrincipal: String,
       plainUser: String, plainPassword: String, 
-      scramUser: String, scramPassword: String): JaasModule = {
+      scramUser: String, scramPassword: String, oauthBearerUser: String): JaasModule = {
     mechanism match {
       case "GSSAPI" =>
         Krb5LoginModule(
@@ -230,6 +247,10 @@ object JaasTestUtils {
           scramUser,
           scramPassword
         )
+      case "OAUTHBEARER" =>
+        OAuthBearerLoginModule(
+          oauthBearerUser
+        )
       case mechanism => throw new IllegalArgumentException("Unsupported client mechanism " + mechanism)
     }
   }
@@ -239,7 +260,7 @@ object JaasTestUtils {
    */
   def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = {
     JaasSection(KafkaClientContextName, mechanism.map(m =>
-      kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2)).toSeq)
+      kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2, KafkaOAuthBearerUser2)).toSeq)
   }
 
   private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =
diff --git a/docs/security.html b/docs/security.html
index db6f487..4fcbdad 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -24,6 +24,7 @@
             <li>SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0</li>
             <li>SASL/PLAIN - starting at version 0.10.0.0</li>
             <li>SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0</li>
+            <li>SASL/OAUTHBEARER - starting at version 2.0</li>
         </ul></li>
         <li>Authentication of connections from brokers to ZooKeeper</li>
         <li>Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)</li>
@@ -273,8 +274,9 @@
             Note that ZooKeeper JAAS config may only be configured using static JAAS configuration.
 
             <p>See <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
-            <a href="#security_sasl_plain_brokerconfig">PLAIN</a> or
-            <a href="#security_sasl_scram_brokerconfig">SCRAM</a> for example broker configurations.</p></li>
+            <a href="#security_sasl_plain_brokerconfig">PLAIN</a>,
+            <a href="#security_sasl_scram_brokerconfig">SCRAM</a> or
+            <a href="#security_sasl_oauthbearer_brokerconfig">OAUTHBEARER</a> for example broker configurations.</p></li>
 
 
         <li><h5><a id="security_jaas_client"
@@ -296,8 +298,9 @@
                 are specified, the client property will be used.</p>
 
                 <p>See <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>,
-                <a href="#security_sasl_plain_clientconfig">PLAIN</a> or
-                <a href="#security_sasl_scram_clientconfig">SCRAM</a> for example configurations.</p></li>
+                <a href="#security_sasl_plain_clientconfig">PLAIN</a>,
+                <a href="#security_sasl_scram_clientconfig">SCRAM</a> or
+                <a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a> for example configurations.</p></li>
 
                 <li><h6><a id="security_client_staticjaas" href="#security_client_staticjaas">JAAS configuration using static config file</a></h6>
                 To configure SASL authentication on the clients using static JAAS config file:
@@ -305,8 +308,9 @@
                 <li>Add a JAAS config file with a client login section named <tt>KafkaClient</tt>. Configure
                     a login module in <tt>KafkaClient</tt> for the selected mechanism as described in the examples
                     for setting up <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>,
-                    <a href="#security_sasl_plain_clientconfig">PLAIN</a> or
-                    <a href="#security_sasl_scram_clientconfig">SCRAM</a>.
+                    <a href="#security_sasl_plain_clientconfig">PLAIN</a>,
+                    <a href="#security_sasl_scram_clientconfig">SCRAM</a> or
+                    <a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a>.
                     For example, <a href="#security_sasl_gssapi_clientconfig">GSSAPI</a>
                     credentials may be configured as:
                     <pre class="brush: text;">
@@ -342,6 +346,7 @@
                 <li><a href="#security_sasl_plain">PLAIN</a></li>
                 <li><a href="#security_sasl_scram">SCRAM-SHA-256</a></li>
                 <li><a href="#security_sasl_scram">SCRAM-SHA-512</a></li>
+                <li><a href="#security_sasl_oauthbearer">OAUTHBEARER</a></li>
             </ul>
         </li>
         <li><h5><a id="security_sasl_brokerconfig"
@@ -653,6 +658,205 @@
         </ol>
     </li>
 
+    <li><h4><a id="security_sasl_oauthbearer" href="#security_sasl_oauthbearer">Authentication using SASL/OAUTHBEARER</a></h4>
+        <p>The <a href="https://tools.ietf.org/html/rfc6749">OAuth 2 Authorization Framework</a> "enables a third-party application to obtain limited access to an HTTP service,
+        either on behalf of a resource owner by orchestrating an approval interaction between the resource owner and the HTTP
+        service, or by allowing the third-party application to obtain access on its own behalf."  The SASL OAUTHBEARER mechanism
+        enables the use of the framework in a SASL (i.e. a non-HTTP) context; it is defined in <a href="https://tools.ietf.org/html/rfc7628">RFC 7628</a>.
+        The default OAUTHBEARER implementation in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>
+        and is only suitable for use in non-production Kafka installations. Refer to <a href="#security_sasl_oauthbearer_security">Security Considerations</a>
+        for more details.</p>
+        <ol>
+        <li><h5><a id="security_sasl_oauthbearer_brokerconfig" href="#security_sasl_oauthbearer_brokerconfig">Configuring Kafka Brokers</a></h5>
+            <ol>
+            <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
+                <pre>
+    KafkaServer {
+        org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
+        unsecuredLoginStringClaim_sub="admin";
+    };</pre>
+                The property <tt>unsecuredLoginStringClaim_sub</tt> in the <tt>KafkaServer</tt> section is used by
+                the broker when it initiates connections to other brokers. In this example, <i>admin</i> will appear in the
+                subject (<tt>sub</tt>) claim and will be the user for inter-broker communication.</li>
+            <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
+                <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre></li>
+            <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>.</pre> For example:
+                <pre>
+    listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
+    security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
+    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
+    sasl.enabled.mechanisms=OAUTHBEARER</pre></li>
+            </ol>
+        </li>
+
+        <li><h5><a id="security_sasl_oauthbearer_clientconfig" href="#security_sasl_oauthbearer_clientconfig">Configuring Kafka Clients</a></h5>
+            To configure SASL authentication on the clients:
+            <ol>
+	    <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
+                The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
+	        The following is an example configuration for a client for the OAUTHBEARER mechanisms:
+                <pre class="brush: text;">
+   sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+        unsecuredLoginStringClaim_sub="alice";</pre>
+
+                <p>The option <tt>unsecuredLoginStringClaim_sub</tt> is used by clients to configure
+                the subject (<tt>sub</tt>) claim, which determines the user for client connections.
+                In this example, clients connect to the broker as user <i>alice</i>.
+                Different clients within a JVM may connect as different users by specifying different subject (<tt>sub</tt>)
+                claims in <code>sasl.jaas.config</code>.</p>
+
+                <p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
+                as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
+                <tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
+            <li>Configure the following properties in producer.properties or consumer.properties:
+                <pre>
+    security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
+    sasl.mechanism=OAUTHBEARER</pre></li>
+            </ol>
+        </li>
+        <li><h5><a id="security_sasl_oauthbearer_unsecured_retrieval" href="#security_sasl_oauthbearer_unsecured_retrieval">Unsecured Token Creation Options for SASL/OAUTHBEARER</a></h5>
+            <ul>
+            <li>The default implementation of SASL/OAUTHBEARER in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>.
+            While suitable only for non-production use, it does provide the flexibility to create arbitrary tokens in a DEV or TEST environment.</li>
+            <li>Here are the various supported JAAS module options on the client side (and on the broker side if OAUTHBEARER is the inter-broker protocol):
+                 <table>
+                 <tr>
+                 <th>JAAS Module Option for Unsecured Token Creation</th>
+                 <th>Documentation</th>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredLoginStringClaim_&lt;claimname&gt;="value"</tt></td>
+                 <td>Creates a <tt>String</tt> claim with the given name and value. Any valid
+                 claim name can be specified except '<tt>iat</tt>' and '<tt>exp</tt>' (these are
+                 automatically generated).</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredLoginNumberClaim_&lt;claimname&gt;="value"</tt></td>
+                 <td>Creates a <tt>Number</tt> claim with the given name and value. Any valid
+                 claim name can be specified except '<tt>iat</tt>' and '<tt>exp</tt>' (these are
+                 automatically generated).</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredLoginListClaim_&lt;claimname&gt;="value"</tt></td>
+                 <td>Creates a <tt>String List</tt> claim with the given name and values parsed
+                 from the given value where the first character is taken as the delimiter. For
+                 example: <tt>unsecuredLoginListClaim_fubar="|value1|value2"</tt>. Any valid
+                 claim name can be specified except '<tt>iat</tt>' and '<tt>exp</tt>' (these are
+                 automatically generated).</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredLoginPrincipalClaimName</tt></td>
+                 <td>Set to a custom claim name if you wish the name of the <tt>String</tt>
+                 claim holding the principal name to be something other than '<tt>sub</tt>'.</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredLoginLifetimeSeconds</tt></td>
+                 <td>Set to an integer value if the token expiration is to be set to something
+                 other than the default value of 3600 seconds (which is 1 hour). The
+                 '<tt>exp</tt>' claim will be set to reflect the expiration time.</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredLoginScopeClaimName</tt></td>
+                 <td>Set to a custom claim name if you wish the name of the <tt>String</tt> or
+                 <tt>String List</tt> claim holding any token scope to be something other than
+                 '<tt>scope</tt>'.</td>
+                 </tr>
+                 </table>
+            </li>
+            </ul>
+        </li>
+        <li><h5><a id="security_sasl_oauthbearer_unsecured_validation" href="#security_sasl_oauthbearer_unsecured_validation">Unsecured Token Validation Options for SASL/OAUTHBEARER</a></h5>
+            <ul>
+            <li>Here are the various supported JAAS module options on the broker side for <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Token</a> validation:
+                 <table>
+                 <tr>
+                 <th>JAAS Module Option for Unsecured Token Validation</th>
+                 <th>Documentation</th>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredValidatorPrincipalClaimName="value"</tt></td>
+                 <td>Set to a non-empty value if you wish a particular <tt>String</tt> claim
+                 holding a principal name to be checked for existence; the default is to check
+                 for the existence of the '<tt>sub</tt>' claim.</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredValidatorScopeClaimName="value"</tt></td>
+                 <td>Set to a custom claim name if you wish the name of the <tt>String</tt> or
+                 <tt>String List</tt> claim holding any token scope to be something other than
+                 '<tt>scope</tt>'.</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredValidatorRequiredScope="value"</tt></td>
+                 <td>Set to a space-delimited list of scope values if you wish the
+                 <tt>String/String List</tt> claim holding the token scope to be checked to
+                 make sure it contains certain values.</td>
+                 </tr>
+                 <tr>
+                 <td><tt>unsecuredValidatorAllowableClockSkewMs="value"</tt></td>
+                 <td>Set to a positive integer value if you wish to allow up to some number of
+                 positive milliseconds of clock skew (the default is 0).</td>
+                 </tr>
+                 </table>
+            </li>
+            <li>The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments)
+            using custom login and SASL Server callback handlers.</li>
+            <li>For more details on security considerations, refer to <a href="https://tools.ietf.org/html/rfc6749#section-10">RFC 6749, Section 10</a>.</li>
+            </ul>
+        </li>
+        <li><h5><a id="security_sasl_oauthbearer_refresh" href="#security_sasl_oauthbearer_refresh">Token Refresh for SASL/OAUTHBEARER</a></h5>
+        Kafka periodically refreshes any token before it expires so that the client can continue to make
+        connections to brokers. The parameters that impact how the refresh algorithm
+        operates are specified as part of the producer/consumer/broker configuration
+        and are as follows. See the documentation for these properties elsewhere for
+        details.  The default values are usually reasonable, in which case these
+        configuration parameters would not need to be explicitly set.
+        <table>
+        <tr>
+        <th>Producer/Consumer/Broker Configuration Property</th>
+        </tr>
+        <tr>
+        <td><tt>sasl.login.refresh.window.factor</tt></td>
+        </tr>
+        <tr>
+        <td><tt>sasl.login.refresh.window.jitter</tt></td>
+        </tr>
+        <tr>
+        <td><tt>sasl.login.refresh.min.period.seconds</tt></td>
+        </tr>
+        <tr>
+        <td><tt>sasl.login.refresh.min.buffer.seconds</tt></td>
+        </tr>
+        </table>
+        </li>
+        <li><h5><a id="security_sasl_oauthbearer_prod" href="#security_sasl_oauthbearer_prod">Secure/Production Use of SASL/OAUTHBEARER</a></h5>
+        Production use cases will require writing an implementation of
+        <tt>org.apache.kafka.common.security.auth.AuthenticateCallbackHandler</tt> that can handle an instance of
+        <tt>org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback</tt> and declaring it via either the
+        <tt>sasl.login.callback.handler.class</tt> configuration option for a
+        non-broker client or via the
+        <tt>listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class</tt>
+        configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker
+        protocol).
+        <p>
+        Production use cases will also require writing an implementation of
+        <tt>org.apache.kafka.common.security.auth.AuthenticateCallbackHandler</tt> that can handle an instance of
+        <tt>org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback</tt> and declaring it via the
+        <tt>listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class</tt>
+        broker configuration option.
+        </li>
+        <li><h5><a id="security_sasl_oauthbearer_security" href="#security_sasl_oauthbearer_security">Security Considerations for SASL/OAUTHBEARER</a></h5>
+            <ul>
+            <li>The default implementation of SASL/OAUTHBEARER in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>.
+            This is suitable only for non-production use.</li>
+            <li>OAUTHBEARER should be used in production enviromnments only with TLS-encryption to prevent interception of tokens.</li>
+            <li>The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments)
+            using custom login and SASL Server callback handlers as described above.</li>
+            <li>For more details on OAuth 2 security considerations in general, refer to <a href="https://tools.ietf.org/html/rfc6749#section-10">RFC 6749, Section 10</a>.</li>
+            </ul>
+        </li>
+        </ol>
+    </li>
+
     <li><h4><a id="security_sasl_multimechanism" href="#security_sasl_multimechanism">Enabling multiple SASL mechanisms in a broker</a></h4>
         <ol>
         <li>Specify configuration for the login modules of all enabled mechanisms in the <tt>KafkaServer</tt> section of the JAAS config file. For example:
@@ -670,13 +874,14 @@
             user_admin="admin-secret"
             user_alice="alice-secret";
         };</pre></li>
-        <li>Enable the SASL mechanisms in server.properties: <pre>    sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512</pre></li>
+        <li>Enable the SASL mechanisms in server.properties: <pre>    sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER</pre></li>
         <li>Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required:
             <pre>
     security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
     sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)</pre></li>
         <li>Follow the mechanism-specific steps in <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
-            <a href="#security_sasl_plain_brokerconfig">PLAIN</a> and <a href="#security_sasl_scram_brokerconfig">SCRAM</a>
+            <a href="#security_sasl_plain_brokerconfig">PLAIN</a>,
+            <a href="#security_sasl_scram_brokerconfig">SCRAM</a> and <a href="#security_sasl_oauthbearer_brokerconfig">OAUTHBEARER</a>
             to configure SASL for the enabled mechanisms.</li>
         </ol>
     </li>

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.