You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/10/28 18:39:27 UTC

[kafka] branch trunk updated: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC (#11284)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b37953  KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC (#11284)
7b37953 is described below

commit 7b379539a5533179b0c86adb352548ac1aa82006
Author: Kirk True <ki...@mustardgrain.com>
AuthorDate: Thu Oct 28 11:36:53 2021 -0700

    KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC (#11284)
    
    This task is to provide a concrete implementation of the interfaces defined in KIP-255 to allow Kafka to connect to an OAuth/OIDC identity provider for authentication and token retrieval. While KIP-255 provides an unsecured JWT example for development, this will fill in the gap and provide a production-grade implementation.
    
    The OAuth/OIDC work will allow out-of-the-box configuration by any Apache Kafka users to connect to an external identity provider service (e.g. Okta, Auth0, Azure, etc.). The code will implement the standard OAuth client credentials grant type.
    
    The proposed change is largely composed of a pair of AuthenticateCallbackHandler implementations: one to login on the client and one to validate on the broker.
    
    See the following for more detail:
    
    KIP-768
    KAFKA-13202
    
    Reviewers: Yi Ding <di...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
 build.gradle                                       |   6 +
 checkstyle/import-control.xml                      |   4 +
 .../apache/kafka/common/config/SaslConfigs.java    | 117 ++++++-
 .../oauthbearer/secured/AccessTokenRetriever.java  |  68 ++++
 .../secured/AccessTokenRetrieverFactory.java       |  83 +++++
 .../oauthbearer/secured/AccessTokenValidator.java  |  64 ++++
 .../secured/AccessTokenValidatorFactory.java       |  74 +++++
 .../oauthbearer/secured/BasicOAuthBearerToken.java | 163 +++++++++
 .../oauthbearer/secured/ClaimValidationUtils.java  | 182 +++++++++++
 .../secured/CloseableVerificationKeyResolver.java  |  50 +++
 .../oauthbearer/secured/ConfigurationUtils.java    | 221 +++++++++++++
 .../oauthbearer/secured/FileTokenRetriever.java    |  54 +++
 .../secured/HttpAccessTokenRetriever.java          | 348 ++++++++++++++++++++
 .../security/oauthbearer/secured/Initable.java     |  35 ++
 .../oauthbearer/secured/JaasOptionsUtils.java      | 108 ++++++
 .../secured/JwksFileVerificationKeyResolver.java   | 117 +++++++
 .../secured/LoginAccessTokenValidator.java         | 135 ++++++++
 .../secured/OAuthBearerLoginCallbackHandler.java   | 296 +++++++++++++++++
 .../OAuthBearerValidatorCallbackHandler.java       | 278 ++++++++++++++++
 .../oauthbearer/secured/RefreshingHttpsJwks.java   | 364 +++++++++++++++++++++
 ...RefreshingHttpsJwksVerificationKeyResolver.java | 153 +++++++++
 .../common/security/oauthbearer/secured/Retry.java | 106 ++++++
 .../security/oauthbearer/secured/Retryable.java    |  45 +++
 .../oauthbearer/secured/SerializedJwt.java         | 107 ++++++
 .../oauthbearer/secured/UnretryableException.java  |  36 ++
 .../oauthbearer/secured/ValidateException.java     |  46 +++
 .../secured/ValidatorAccessTokenValidator.java     | 213 ++++++++++++
 .../secured/VerificationKeyResolverFactory.java    |  89 +++++
 .../oauthbearer/secured/AccessTokenBuilder.java    | 192 +++++++++++
 .../secured/AccessTokenRetrieverFactoryTest.java   |  65 ++++
 .../secured/AccessTokenValidatorFactoryTest.java   |  70 ++++
 .../secured/AccessTokenValidatorTest.java          |  92 ++++++
 .../secured/BasicOAuthBearerTokenTest.java         |  90 +++++
 .../secured/ClaimValidationUtilsTest.java          | 164 ++++++++++
 .../secured/ConfigurationUtilsTest.java            | 136 ++++++++
 .../secured/HttpAccessTokenRetrieverTest.java      | 172 ++++++++++
 .../oauthbearer/secured/JaasOptionsUtilsTest.java  |  60 ++++
 .../secured/LoginAccessTokenValidatorTest.java     |  27 ++
 .../OAuthBearerLoginCallbackHandlerTest.java       | 227 +++++++++++++
 .../oauthbearer/secured/OAuthBearerTest.java       | 198 +++++++++++
 .../OAuthBearerValidatorCallbackHandlerTest.java   | 109 ++++++
 .../secured/RefreshingHttpsJwksTest.java           | 197 +++++++++++
 .../security/oauthbearer/secured/RetryTest.java    | 136 ++++++++
 .../secured/ValidatorAccessTokenValidatorTest.java |  59 ++++
 core/src/main/scala/kafka/server/KafkaConfig.scala |  51 +++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  14 +
 gradle/dependencies.gradle                         |   2 +
 .../apache/kafka/tools/OAuthCompatibilityTool.java | 278 ++++++++++++++++
 48 files changed, 5896 insertions(+), 5 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7610d4e..b79cec2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -829,6 +829,7 @@ project(':core') {
     implementation libs.jacksonDataformatCsv
     implementation libs.jacksonJDK8Datatypes
     implementation libs.joptSimple
+    implementation libs.jose4j
     implementation libs.metrics
     implementation libs.scalaCollectionCompat
     implementation libs.scalaJava8Compat
@@ -1200,6 +1201,7 @@ project(':clients') {
 
     compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
     compileOnly libs.jacksonJDK8Datatypes
+    compileOnly libs.jose4j          // for SASL/OAUTHBEARER JWT validation; only used by broker
 
     testImplementation libs.bcpkix
     testImplementation libs.junitJupiter
@@ -1208,6 +1210,7 @@ project(':clients') {
     testRuntimeOnly libs.slf4jlog4j
     testRuntimeOnly libs.jacksonDatabind
     testRuntimeOnly libs.jacksonJDK8Datatypes
+    testImplementation libs.jose4j
     testImplementation libs.jacksonJaxrsJsonProvider
   }
 
@@ -1604,6 +1607,7 @@ project(':tools') {
     implementation libs.slf4jApi
     implementation libs.log4j
 
+    implementation libs.jose4j                    // for SASL/OAUTHBEARER JWT validation
     implementation libs.jacksonJaxrsJsonProvider
 
     testImplementation project(':clients')
@@ -1701,6 +1705,7 @@ project(':shell') {
     implementation project(':metadata')
     implementation project(':raft')
 
+    implementation libs.jose4j                    // for SASL/OAUTHBEARER JWT validation
     implementation libs.jacksonJaxrsJsonProvider
 
     testImplementation project(':clients')
@@ -2377,6 +2382,7 @@ project(':connect:runtime') {
 
     implementation libs.slf4jApi
     implementation libs.log4j
+    implementation libs.jose4j                    // for SASL/OAUTHBEARER JWT validation
     implementation libs.jacksonAnnotations
     implementation libs.jacksonJaxrsJsonProvider
     implementation libs.jerseyContainerServlet
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 88538d0..8b7a8e9 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -139,6 +139,9 @@
       </subpackage>
       <subpackage name="oauthbearer">
         <allow pkg="com.fasterxml.jackson.databind" />
+        <subpackage name="secured">
+          <allow pkg="org.jose4j" />
+        </subpackage>
       </subpackage>
     </subpackage>
 
@@ -338,6 +341,7 @@
     <allow pkg="org.apache.kafka.clients.producer" />
     <allow pkg="org.apache.kafka.clients.consumer" />
     <allow pkg="com.fasterxml.jackson" />
+    <allow pkg="org.jose4j" />
     <allow pkg="net.sourceforge.argparse4j" />
     <allow pkg="org.apache.log4j" />
   </subpackage>
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 26897b2..a897b5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.config;
 import org.apache.kafka.common.config.ConfigDef.Range;
 
 public class SaslConfigs {
+
+    private static final String OAUTHBEARER_NOTE = " Currently applies only to OAUTHBEARER.";
+
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
      */
@@ -75,20 +78,22 @@ public class SaslConfigs {
     public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC = "Login refresh thread will sleep until the specified window factor relative to the"
             + " credential's lifetime has been reached, at which time it will try to refresh the credential."
             + " Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used"
-            + " if no value is specified. Currently applies only to OAUTHBEARER.";
+            + " if no value is specified."
+            + OAUTHBEARER_NOTE;
     public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80;
 
     public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER = "sasl.login.refresh.window.jitter";
     public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC = "The maximum amount of random jitter relative to the credential's lifetime"
             + " that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;"
-            + " a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.";
+            + " a default value of 0.05 (5%) is used if no value is specified."
+            + OAUTHBEARER_NOTE;
     public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05;
 
     public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS = "sasl.login.refresh.min.period.seconds";
     public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC = "The desired minimum time for the login refresh thread to wait before refreshing a credential,"
             + " in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified.  This value and "
             + " sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
-            + " Currently applies only to OAUTHBEARER.";
+            + OAUTHBEARER_NOTE;
     public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60;
 
     public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS = "sasl.login.refresh.buffer.seconds";
@@ -96,9 +101,97 @@ public class SaslConfigs {
             + " in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain"
             + " as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of  300 (5 minutes) is used if no value is specified."
             + " This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
-            + " Currently applies only to OAUTHBEARER.";
+            + OAUTHBEARER_NOTE;
     public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300;
 
+    public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms";
+    public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider connection timeout."
+            + OAUTHBEARER_NOTE;
+
+    public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms";
+    public static final String SASL_LOGIN_READ_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider read timeout."
+            + OAUTHBEARER_NOTE;
+
+    private static final String LOGIN_EXPONENTIAL_BACKOFF_NOTE = " Login uses an exponential backoff algorithm with an initial wait based on the"
+            + " sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the"
+            + " sasl.login.retry.backoff.max.ms setting."
+            + OAUTHBEARER_NOTE;
+
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS = "sasl.login.retry.backoff.max.ms";
+    public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS = 10000;
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC = "The (optional) value in milliseconds for the maximum wait between login attempts to the"
+            + " external authentication provider."
+            + LOGIN_EXPONENTIAL_BACKOFF_NOTE;
+
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms";
+    public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS = 100;
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MS_DOC = "The (optional) value in milliseconds for the initial wait between login attempts to the external"
+            + " authentication provider."
+            + LOGIN_EXPONENTIAL_BACKOFF_NOTE;
+
+    public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "sasl.oauthbearer.scope.claim.name";
+    public static final String DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "scope";
+    public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC = "The OAuth claim for the scope is often named \"" + DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME + "\", but this (optional)"
+            + " setting can provide a different name to use for the scope included in the JWT payload's claims if the OAuth/OIDC provider uses a different"
+            + " name for that claim.";
+
+    public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sasl.oauthbearer.sub.claim.name";
+    public static final String DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sub";
+    public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC = "The OAuth claim for the subject is often named \"" + DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME + "\", but this (optional)"
+            + " setting can provide a different name to use for the subject included in the JWT payload's claims if the OAuth/OIDC provider uses a different"
+            + " name for that claim.";
+
+    public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url";
+    public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC = "The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer's token"
+            + " endpoint URL to which requests will be made to login based on the configuration in " + SASL_JAAS_CONFIG + ". If the URL is file-based, it"
+            + " specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.";
+
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_URL = "sasl.oauthbearer.jwks.endpoint.url";
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC = "The OAuth/OIDC provider URL from which the provider's"
+            + " <a href=\"https://datatracker.ietf.org/doc/html/rfc7517#section-5\">JWKS (JSON Web Key Set)</a> can be retrieved. The URL can be HTTP(S)-based or file-based."
+            + " If the URL is HTTP(S)-based, the JWKS data will be retrieved from the OAuth/OIDC provider via the configured URL on broker startup. All then-current"
+            + " keys will be cached on the broker for incoming requests. If an authentication request is received for a JWT that includes a \"kid\" header claim value that"
+            + " isn't yet in the cache, the JWKS endpoint will be queried again on demand. However, the broker polls the URL every sasl.oauthbearer.jwks.endpoint.refresh.ms"
+            + " milliseconds to refresh the cache with any forthcoming keys before any JWT requests that include them are received."
+            + " If the URL is file-based, the broker will load the JWKS file from a configured location on startup. In the event that the JWT includes a \"kid\" header"
+            + " value that isn't in the JWKS file, the broker will reject the JWT and authentication will fail.";
+
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS = "sasl.oauthbearer.jwks.endpoint.refresh.ms";
+    public static final long DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS = 60 * 60 * 1000;
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC = "The (optional) value in milliseconds for the broker to wait between refreshing its JWKS (JSON Web Key Set)"
+            + " cache that contains the keys to verify the signature of the JWT.";
+
+    private static final String JWKS_EXPONENTIAL_BACKOFF_NOTE = " JWKS retrieval uses an exponential backoff algorithm with an initial wait based on the"
+        + " sasl.oauthbearer.jwks.endpoint.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the"
+        + " sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms setting.";
+
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS = "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms";
+    public static final long DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS = 10000;
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC = "The (optional) value in milliseconds for the maximum wait between attempts to retrieve the JWKS (JSON Web Key Set)"
+        + " from the external authentication provider."
+        + JWKS_EXPONENTIAL_BACKOFF_NOTE;
+
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS = "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms";
+    public static final long DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS = 100;
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC = "The (optional) value in milliseconds for the initial wait between JWKS (JSON Web Key Set) retrieval attempts from the external"
+        + " authentication provider."
+        + JWKS_EXPONENTIAL_BACKOFF_NOTE;
+
+    public static final String SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS = "sasl.oauthbearer.clock.skew.seconds";
+    public static final int DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS = 30;
+    public static final String SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC = "The (optional) value in seconds to allow for differences between the time of the OAuth/OIDC identity provider and"
+            + " the broker.";
+
+    public static final String SASL_OAUTHBEARER_EXPECTED_AUDIENCE = "sasl.oauthbearer.expected.audience";
+    public static final String SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC = "The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the"
+            + " expected audiences. The JWT will be inspected for the standard OAuth \"aud\" claim and if this value is set, the broker will match the value from JWT's \"aud\" claim "
+            + " to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.";
+
+    public static final String SASL_OAUTHBEARER_EXPECTED_ISSUER = "sasl.oauthbearer.expected.issuer";
+    public static final String SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC = "The (optional) setting for the broker to use to verify that the JWT was created by the expected issuer. The JWT will"
+            + " be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no"
+            + " match, the broker will reject the JWT and authentication will fail.";
+
     public static void addClientSaslSupport(ConfigDef config) {
         config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
@@ -113,6 +206,20 @@ public class SaslConfigs {
                 .define(SaslConfigs.SASL_JAAS_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_JAAS_CONFIG_DOC)
                 .define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
                 .define(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS_DOC)
-                .define(SaslConfigs.SASL_LOGIN_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_LOGIN_CLASS_DOC);
+                .define(SaslConfigs.SASL_LOGIN_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_LOGIN_CLASS_DOC)
+                .define(SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS, ConfigDef.Type.INT, null, ConfigDef.Importance.LOW, SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC)
+                .define(SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS, ConfigDef.Type.INT, null, ConfigDef.Importance.LOW, SASL_LOGIN_READ_TIMEOUT_MS_DOC)
+                .define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS, ConfigDef.Type.LONG, DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS, ConfigDef.Importance.LOW, SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC)
+                .define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS, ConfigDef.Type.LONG, DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS, ConfigDef.Importance.LOW, SASL_LOGIN_RETRY_BACKOFF_MS_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, ConfigDef.Type.STRING, DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, ConfigDef.Type.STRING, DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, ConfigDef.Type.INT, DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
+                .define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetriever.java
new file mode 100644
index 0000000..e4ae599
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetriever.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * An <code>AccessTokenRetriever</code> is the internal API by which the login module will
+ * retrieve an access token for use in authorization by the broker. The implementation may
+ * involve authentication to a remote system, or it can be as simple as loading the contents
+ * of a file or configuration setting.
+ *
+ * <i>Retrieval</i> is a separate concern from <i>validation</i>, so it isn't necessary for
+ * the <code>AccessTokenRetriever</code> implementation to validate the integrity of the JWT
+ * access token.
+ *
+ * @see HttpAccessTokenRetriever
+ * @see FileTokenRetriever
+ */
+
+public interface AccessTokenRetriever extends Initable, Closeable {
+
+    /**
+     * Retrieves a JWT access token in its serialized three-part form. The implementation
+     * is free to determine how it should be retrieved but should not perform validation
+     * on the result.
+     *
+     * <b>Note</b>: This is a blocking function and callers should be aware that the
+     * implementation may be communicating over a network, with the file system, coordinating
+     * threads, etc. The facility in the {@link javax.security.auth.spi.LoginModule} from
+     * which this is ultimately called does not provide an asynchronous approach.
+     *
+     * @return Non-<code>null</code> JWT access token string
+     *
+     * @throws IOException Thrown on errors related to IO during retrieval
+     */
+
+    String retrieve() throws IOException;
+
+    /**
+     * Lifecycle method to perform a clean shutdown of the retriever. This must
+     * be performed by the caller to ensure the correct state, freeing up and releasing any
+     * resources performed in {@link #init()}.
+     *
+     * @throws IOException Thrown on errors related to IO during closure
+     */
+
+    default void close() throws IOException {
+        // This method left intentionally blank.
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactory.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactory.java
new file mode 100644
index 0000000..e7b3b5c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
+
+import java.net.URL;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+
+public class AccessTokenRetrieverFactory  {
+
+    /**
+     * Create an {@link AccessTokenRetriever} from the given SASL and JAAS configuration.
+     *
+     * <b>Note</b>: the returned <code>AccessTokenRetriever</code> is <em>not</em> initialized
+     * here and must be done by the caller prior to use.
+     *
+     * @param configs    SASL configuration
+     * @param jaasConfig JAAS configuration
+     *
+     * @return Non-<code>null</code> {@link AccessTokenRetriever}
+     */
+
+    public static AccessTokenRetriever create(Map<String, ?> configs, Map<String, Object> jaasConfig) {
+        return create(configs, null, jaasConfig);
+    }
+
+    public static AccessTokenRetriever create(Map<String, ?> configs,
+        String saslMechanism,
+        Map<String, Object> jaasConfig) {
+        ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
+        URL tokenEndpointUrl = cu.validateUrl(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
+
+        if (tokenEndpointUrl.getProtocol().toLowerCase(Locale.ROOT).equals("file")) {
+            return new FileTokenRetriever(cu.validateFile(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL));
+        } else {
+            JaasOptionsUtils jou = new JaasOptionsUtils(jaasConfig);
+            String clientId = jou.validateString(CLIENT_ID_CONFIG);
+            String clientSecret = jou.validateString(CLIENT_SECRET_CONFIG);
+            String scope = jou.validateString(SCOPE_CONFIG, false);
+
+            SSLSocketFactory sslSocketFactory = null;
+
+            if (jou.shouldCreateSSLSocketFactory(tokenEndpointUrl))
+                sslSocketFactory = jou.createSSLSocketFactory();
+
+            return new HttpAccessTokenRetriever(clientId,
+                clientSecret,
+                scope,
+                sslSocketFactory,
+                tokenEndpointUrl.toString(),
+                cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS),
+                cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
+                cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false),
+                cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false));
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidator.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidator.java
new file mode 100644
index 0000000..2a8c2b0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+
+/**
+ * An instance of <code>AccessTokenValidator</code> acts as a function object that, given an access
+ * token in base-64 encoded JWT format, can parse the data, perform validation, and construct an
+ * {@link OAuthBearerToken} for use by the caller.
+ *
+ * The primary reason for this abstraction is that client and broker may have different libraries
+ * available to them to perform these operations. Additionally, the exact steps for validation may
+ * differ between implementations. To put this more concretely: the implementation in the Kafka
+ * client does not have bundled a robust library to perform this logic, and it is not the
+ * responsibility of the client to perform vigorous validation. However, the Kafka broker ships with
+ * a richer set of library dependencies that can perform more substantial validation and is also
+ * expected to perform a trust-but-verify test of the access token's signature.
+ *
+ * See:
+ *
+ * <ul>
+ *     <li><a href="https://datatracker.ietf.org/doc/html/rfc6749#section-1.4">RFC 6749, Section 1.4</a></li>
+ *     <li><a href="https://datatracker.ietf.org/doc/html/rfc6750#section-2.1">RFC 6750, Section 2.1</a></li>
+ *     <li><a href="https://datatracker.ietf.org/doc/html/draft-ietf-oauth-access-token-jwt">RFC 6750, Section 2.1</a></li>
+ * </ul>
+ *
+ * @see LoginAccessTokenValidator A basic AccessTokenValidator used by client-side login
+ *                                authentication
+ * @see ValidatorAccessTokenValidator A more robust AccessTokenValidator that is used on the broker
+ *                                    to validate the token's contents and verify the signature
+ */
+
+public interface AccessTokenValidator {
+
+    /**
+     * Accepts an OAuth JWT access token in base-64 encoded format, validates, and returns an
+     * OAuthBearerToken.
+     *
+     * @param accessToken Non-<code>null</code> JWT access token
+     *
+     * @return {@link OAuthBearerToken}
+     *
+     * @throws ValidateException Thrown on errors performing validation of given token
+     */
+
+    OAuthBearerToken validate(String accessToken) throws ValidateException;
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactory.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactory.java
new file mode 100644
index 0000000..232ebc1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+
+public class AccessTokenValidatorFactory {
+
+    public static AccessTokenValidator create(Map<String, ?> configs) {
+        return create(configs, (String) null);
+    }
+
+    public static AccessTokenValidator create(Map<String, ?> configs, String saslMechanism) {
+        ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
+        String scopeClaimName = cu.get(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
+        String subClaimName = cu.get(SASL_OAUTHBEARER_SUB_CLAIM_NAME);
+        return new LoginAccessTokenValidator(scopeClaimName, subClaimName);
+    }
+
+    public static AccessTokenValidator create(Map<String, ?> configs,
+        VerificationKeyResolver verificationKeyResolver) {
+        return create(configs, null, verificationKeyResolver);
+    }
+
+    public static AccessTokenValidator create(Map<String, ?> configs,
+        String saslMechanism,
+        VerificationKeyResolver verificationKeyResolver) {
+        ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
+        Set<String> expectedAudiences = null;
+        List<String> l = cu.get(SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
+
+        if (l != null)
+            expectedAudiences = Collections.unmodifiableSet(new HashSet<>(l));
+
+        Integer clockSkew = cu.validateInteger(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, false);
+        String expectedIssuer = cu.validateString(SASL_OAUTHBEARER_EXPECTED_ISSUER, false);
+        String scopeClaimName = cu.validateString(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
+        String subClaimName = cu.validateString(SASL_OAUTHBEARER_SUB_CLAIM_NAME);
+
+        return new ValidatorAccessTokenValidator(clockSkew,
+            expectedAudiences,
+            expectedIssuer,
+            verificationKeyResolver,
+            scopeClaimName,
+            subClaimName);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/BasicOAuthBearerToken.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/BasicOAuthBearerToken.java
new file mode 100644
index 0000000..8527f80
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/BasicOAuthBearerToken.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.util.Set;
+import java.util.StringJoiner;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+
+/**
+ * An implementation of the {@link OAuthBearerToken} that fairly straightforwardly stores the values
+ * given to its constructor (except the scope set which is copied to avoid modifications).
+ *
+ * Very little validation is applied here with respect to the validity of the given values. All
+ * validation is assumed to happen by users of this class.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515: JSON Web Signature (JWS)</a>
+ */
+
+public class BasicOAuthBearerToken implements OAuthBearerToken {
+
+    private final String token;
+
+    private final Set<String> scopes;
+
+    private final Long lifetimeMs;
+
+    private final String principalName;
+
+    private final Long startTimeMs;
+
+    /**
+     * Creates a new OAuthBearerToken instance around the given values.
+     *
+     * @param token         Value containing the compact serialization as a base 64 string that
+     *                      can be parsed, decoded, and validated as a well-formed JWS. Must be
+     *                      non-<code>null</code>, non-blank, and non-whitespace only.
+     * @param scopes        Set of non-<code>null</code> scopes. May contain case-sensitive
+     *                      "duplicates". The given set is copied and made unmodifiable so neither
+     *                      the caller of this constructor nor any downstream users can modify it.
+     * @param lifetimeMs    The token's lifetime, expressed as the number of milliseconds since the
+     *                      epoch. Must be non-negative.
+     * @param principalName The name of the principal to which this credential applies. Must be
+     *                      non-<code>null</code>, non-blank, and non-whitespace only.
+     * @param startTimeMs   The token's start time, expressed as the number of milliseconds since
+     *                      the epoch, if available, otherwise <code>null</code>. Must be
+     *                      non-negative if a non-<code>null</code> value is provided.
+     */
+
+    public BasicOAuthBearerToken(String token,
+        Set<String> scopes,
+        long lifetimeMs,
+        String principalName,
+        Long startTimeMs) {
+        this.token = token;
+        this.scopes = scopes;
+        this.lifetimeMs = lifetimeMs;
+        this.principalName = principalName;
+        this.startTimeMs = startTimeMs;
+    }
+
+    /**
+     * The <code>b64token</code> value as defined in
+     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
+     * 2.1</a>
+     *
+     * @return <code>b64token</code> value as defined in
+     *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
+     *         Section 2.1</a>
+     */
+
+    @Override
+    public String value() {
+        return token;
+    }
+
+    /**
+     * The token's scope of access, as per
+     * <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
+     * 1.4</a>
+     *
+     * @return the token's (always non-null but potentially empty) scope of access,
+     *         as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
+     *         6749 Section 1.4</a>. Note that all values in the returned set will
+     *         be trimmed of preceding and trailing whitespace, and the result will
+     *         never contain the empty string.
+     */
+
+    @Override
+    public Set<String> scope() {
+        // Immutability of the set is performed in the constructor/validation utils class, so
+        // we don't need to repeat it here.
+        return scopes;
+    }
+
+    /**
+     * The token's lifetime, expressed as the number of milliseconds since the
+     * epoch, as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
+     * 6749 Section 1.4</a>
+     *
+     * @return the token's lifetime, expressed as the number of milliseconds since
+     *         the epoch, as per
+     *         <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
+     *         Section 1.4</a>.
+     */
+
+    @Override
+    public long lifetimeMs() {
+        return lifetimeMs;
+    }
+
+    /**
+     * The name of the principal to which this credential applies
+     *
+     * @return the always non-null/non-empty principal name
+     */
+
+    @Override
+    public String principalName() {
+        return principalName;
+    }
+
+    /**
+     * When the credential became valid, in terms of the number of milliseconds
+     * since the epoch, if known, otherwise null. An expiring credential may not
+     * necessarily indicate when it was created -- just when it expires -- so we
+     * need to support a null return value here.
+     *
+     * @return the time when the credential became valid, in terms of the number of
+     *         milliseconds since the epoch, if known, otherwise null
+     */
+
+    @Override
+    public Long startTimeMs() {
+        return startTimeMs;
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", BasicOAuthBearerToken.class.getSimpleName() + "[", "]")
+            .add("token='" + token + "'")
+            .add("scopes=" + scopes)
+            .add("lifetimeMs=" + lifetimeMs)
+            .add("principalName='" + principalName + "'")
+            .add("startTimeMs=" + startTimeMs)
+            .toString();
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ClaimValidationUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ClaimValidationUtils.java
new file mode 100644
index 0000000..bb08ec5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ClaimValidationUtils.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Simple utility class to perform basic cleaning and validation on input values so that they're
+ * performed consistently throughout the code base.
+ */
+
+public class ClaimValidationUtils {
+
+    /**
+     * Validates that the scopes are valid, where <i>invalid</i> means <i>any</i> of
+     * the following:
+     *
+     * <ul>
+     *     <li>Collection is <code>null</code></li>
+     *     <li>Collection has duplicates</li>
+     *     <li>Any of the elements in the collection are <code>null</code></li>
+     *     <li>Any of the elements in the collection are zero length</li>
+     *     <li>Any of the elements in the collection are whitespace only</li>
+     * </ul>
+     *
+     * @param scopeClaimName Name of the claim used for the scope values
+     * @param scopes         Collection of String scopes
+     *
+     * @return Unmodifiable {@link Set} that includes the values of the original set, but with
+     *         each value trimmed
+     *
+     * @throws ValidateException Thrown if the value is <code>null</code>, contains duplicates, or
+     *                           if any of the values in the set are <code>null</code>, empty,
+     *                           or whitespace only
+     */
+
+    public static Set<String> validateScopes(String scopeClaimName, Collection<String> scopes) throws ValidateException {
+        if (scopes == null)
+            throw new ValidateException(String.format("%s value must be non-null", scopeClaimName));
+
+        Set<String> copy = new HashSet<>();
+
+        for (String scope : scopes) {
+            scope = validateString(scopeClaimName, scope);
+
+            if (copy.contains(scope))
+                throw new ValidateException(String.format("%s value must not contain duplicates - %s already present", scopeClaimName, scope));
+
+            copy.add(scope);
+        }
+
+        return Collections.unmodifiableSet(copy);
+    }
+
+    /**
+     * Validates that the given lifetime is valid, where <i>invalid</i> means <i>any</i> of
+     * the following:
+     *
+     * <ul>
+     *     <li><code>null</code></li>
+     *     <li>Negative</li>
+     * </ul>
+     *
+     * @param claimName  Name of the claim
+     * @param claimValue Expiration time (in milliseconds)
+     *
+     * @return Input parameter, as provided
+     *
+     * @throws ValidateException Thrown if the value is <code>null</code> or negative
+     */
+
+    public static long validateExpiration(String claimName, Long claimValue) throws ValidateException {
+        if (claimValue == null)
+            throw new ValidateException(String.format("%s value must be non-null", claimName));
+
+        if (claimValue < 0)
+            throw new ValidateException(String.format("%s value must be non-negative; value given was \"%s\"", claimName, claimValue));
+
+        return claimValue;
+    }
+
+    /**
+     * Validates that the given claim value is valid, where <i>invalid</i> means <i>any</i> of
+     * the following:
+     *
+     * <ul>
+     *     <li><code>null</code></li>
+     *     <li>Zero length</li>
+     *     <li>Whitespace only</li>
+     * </ul>
+     *
+     * @param claimName  Name of the claim
+     * @param claimValue Name of the subject
+     *
+     * @return Trimmed version of the <code>claimValue</code> parameter
+     *
+     * @throws ValidateException Thrown if the value is <code>null</code>, empty, or whitespace only
+     */
+
+    public static String validateSubject(String claimName, String claimValue) throws ValidateException {
+        return validateString(claimName, claimValue);
+    }
+
+    /**
+     * Validates that the given issued at claim name is valid, where <i>invalid</i> means <i>any</i> of
+     * the following:
+     *
+     * <ul>
+     *     <li>Negative</li>
+     * </ul>
+     *
+     * @param claimName  Name of the claim
+     * @param claimValue Start time (in milliseconds) or <code>null</code> if not used
+     *
+     * @return Input parameter, as provided
+     *
+     * @throws ValidateException Thrown if the value is negative
+     */
+
+    public static Long validateIssuedAt(String claimName, Long claimValue) throws ValidateException {
+        if (claimValue != null && claimValue < 0)
+            throw new ValidateException(String.format("%s value must be null or non-negative; value given was \"%s\"", claimName, claimValue));
+
+        return claimValue;
+    }
+
+    /**
+     * Validates that the given claim name override is valid, where <i>invalid</i> means
+     * <i>any</i> of the following:
+     *
+     * <ul>
+     *     <li><code>null</code></li>
+     *     <li>Zero length</li>
+     *     <li>Whitespace only</li>
+     * </ul>
+     *
+     * @param name  "Standard" name of the claim, e.g. <code>sub</code>
+     * @param value "Override" name of the claim, e.g. <code>email</code>
+     *
+     * @return Trimmed version of the <code>value</code> parameter
+     *
+     * @throws ValidateException Thrown if the value is <code>null</code>, empty, or whitespace only
+     */
+
+    public static String validateClaimNameOverride(String name, String value) throws ValidateException {
+        return validateString(name, value);
+    }
+
+    private static String validateString(String name, String value) throws ValidateException {
+        if (value == null)
+            throw new ValidateException(String.format("%s value must be non-null", name));
+
+        if (value.isEmpty())
+            throw new ValidateException(String.format("%s value must be non-empty", name));
+
+        value = value.trim();
+
+        if (value.isEmpty())
+            throw new ValidateException(String.format("%s value must not contain only whitespace", name));
+
+        return value;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/CloseableVerificationKeyResolver.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/CloseableVerificationKeyResolver.java
new file mode 100644
index 0000000..b74aaa1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/CloseableVerificationKeyResolver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+
+/**
+ * The {@link OAuthBearerValidatorCallbackHandler} uses a {@link VerificationKeyResolver} as
+ * part of its validation of the incoming JWT. Some of the <code>VerificationKeyResolver</code>
+ * implementations use resources like threads, connections, etc. that should be properly closed
+ * when no longer needed. Since the <code>VerificationKeyResolver</code> interface itself doesn't
+ * define a <code>close</code> method, we provide a means to do that here.
+ *
+ * @see OAuthBearerValidatorCallbackHandler
+ * @see VerificationKeyResolver
+ * @see Closeable
+ */
+
+public interface CloseableVerificationKeyResolver extends Initable, Closeable, VerificationKeyResolver {
+
+    /**
+     * Lifecycle method to perform a clean shutdown of the {@link VerificationKeyResolver}.
+     * This must be performed by the caller to ensure the correct state, freeing up
+     * and releasing any resources performed in {@link #init()}.
+     *
+     * @throws IOException Thrown on errors related to IO during closure
+     */
+
+    default void close() throws IOException {
+        // This method left intentionally blank.
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java
new file mode 100644
index 0000000..f17295d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+
+/**
+ * <code>ConfigurationUtils</code> is a utility class to perform basic configuration-related
+ * logic and is separated out here for easier, more direct testing.
+ */
+
+public class ConfigurationUtils {
+
+    private final Map<String, ?> configs;
+
+    private final String prefix;
+
+    public ConfigurationUtils(Map<String, ?> configs) {
+        this(configs, null);
+    }
+
+    public ConfigurationUtils(Map<String, ?> configs, String saslMechanism) {
+        this.configs = configs;
+
+        if (saslMechanism != null && !saslMechanism.trim().isEmpty())
+            this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim());
+        else
+            this.prefix = null;
+    }
+
+    /**
+     * Validates that, if a value is supplied, is a file that:
+     *
+     * <li>
+     *     <ul>exists</ul>
+     *     <ul>has read permission</ul>
+     *     <ul>points to a file</ul>
+     * </li>
+     *
+     * If the value is null or an empty string, it is assumed to be an "empty" value and thus.
+     * ignored. Any whitespace is trimmed off of the beginning and end.
+     */
+
+    public Path validateFile(String name) {
+        URL url = validateUrl(name);
+        File file;
+
+        try {
+            file = new File(url.toURI().getRawPath()).getAbsoluteFile();
+        } catch (URISyntaxException e) {
+            throw new ConfigException(name, url.toString(), String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, url, e.getMessage()));
+        }
+
+        if (!file.exists())
+            throw new ConfigException(name, file, String.format("The OAuth configuration option %s contains a file (%s) that doesn't exist", name, file));
+
+        if (!file.canRead())
+            throw new ConfigException(name, file, String.format("The OAuth configuration option %s contains a file (%s) that doesn't have read permission", name, file));
+
+        if (file.isDirectory())
+            throw new ConfigException(name, file, String.format("The OAuth configuration option %s references a directory (%s), not a file", name, file));
+
+        return file.toPath();
+    }
+
+    /**
+     * Validates that, if a value is supplied, is a value that:
+     *
+     * <li>
+     *     <ul>is an Integer</ul>
+     *     <ul>has a value that is not less than the provided minimum value</ul>
+     * </li>
+     *
+     * If the value is null or an empty string, it is assumed to be an "empty" value and thus
+     * ignored. Any whitespace is trimmed off of the beginning and end.
+     */
+
+    public Integer validateInteger(String name, boolean isRequired) {
+        Integer value = get(name);
+
+        if (value == null) {
+            if (isRequired)
+                throw new ConfigException(name, null, String.format("The OAuth configuration option %s must be non-null", name));
+            else
+                return null;
+        }
+
+        return value;
+    }
+
+    /**
+     * Validates that, if a value is supplied, is a value that:
+     *
+     * <li>
+     *     <ul>is an Integer</ul>
+     *     <ul>has a value that is not less than the provided minimum value</ul>
+     * </li>
+     *
+     * If the value is null or an empty string, it is assumed to be an "empty" value and thus
+     * ignored. Any whitespace is trimmed off of the beginning and end.
+     */
+
+    public Long validateLong(String name) {
+        return validateLong(name, true);
+    }
+
+    public Long validateLong(String name, boolean isRequired) {
+        return validateLong(name, isRequired, null);
+    }
+
+    public Long validateLong(String name, boolean isRequired, Long min) {
+        Long value = get(name);
+
+        if (value == null) {
+            if (isRequired)
+                throw new ConfigException(name, null, String.format("The OAuth configuration option %s must be non-null", name));
+            else
+                return null;
+        }
+
+        if (min != null && value < min)
+            throw new ConfigException(name, value, String.format("The OAuth configuration option %s value must be at least %s", name, min));
+
+        return value;
+    }
+
+    /**
+     * Validates that the configured URL that:
+     *
+     * <li>
+     *     <ul>is well-formed</ul>
+     *     <ul>contains a scheme</ul>
+     *     <ul>uses either HTTP, HTTPS, or file protocols</ul>
+     * </li>
+     *
+     * No effort is made to connect to the URL in the validation step.
+     */
+
+    public URL validateUrl(String name) {
+        String value = validateString(name);
+        URL url;
+
+        try {
+            url = new URL(value);
+        } catch (MalformedURLException e) {
+            throw new ConfigException(name, value, String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, value, e.getMessage()));
+        }
+
+        String protocol = url.getProtocol();
+
+        if (protocol == null || protocol.trim().isEmpty())
+            throw new ConfigException(name, value, String.format("The OAuth configuration option %s contains a URL (%s) that is missing the protocol", name, value));
+
+        protocol = protocol.toLowerCase(Locale.ROOT);
+
+        if (!(protocol.equals("http") || protocol.equals("https") || protocol.equals("file")))
+            throw new ConfigException(name, value, String.format("The OAuth configuration option %s contains a URL (%s) that contains an invalid protocol (%s); only \"http\", \"https\", and \"file\" protocol are supported", name, value, protocol));
+
+        return url;
+    }
+
+    public String validateString(String name) throws ValidateException {
+        return validateString(name, true);
+    }
+
+    public String validateString(String name, boolean isRequired) throws ValidateException {
+        String value = get(name);
+
+        if (value == null) {
+            if (isRequired)
+                throw new ConfigException(String.format("The OAuth configuration option %s value must be non-null", name));
+            else
+                return null;
+        }
+
+        value = value.trim();
+
+        if (value.isEmpty()) {
+            if (isRequired)
+                throw new ConfigException(String.format("The OAuth configuration option %s value must not contain only whitespace", name));
+            else
+                return null;
+        }
+
+        return value;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T get(String name) {
+        T value = (T) configs.get(prefix + name);
+
+        if (value != null)
+            return value;
+
+        return (T) configs.get(name);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/FileTokenRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/FileTokenRetriever.java
new file mode 100644
index 0000000..3ffa4c8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/FileTokenRetriever.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * <code>FileTokenRetriever</code> is an {@link AccessTokenRetriever} that will load the contents,
+ * interpreting them as a JWT access key in the serialized form.
+ *
+ * @see AccessTokenRetriever
+ */
+
+public class FileTokenRetriever implements AccessTokenRetriever {
+
+    private final Path accessTokenFile;
+
+    private String accessToken;
+
+    public FileTokenRetriever(Path accessTokenFile) {
+        this.accessTokenFile = accessTokenFile;
+    }
+
+    @Override
+    public void init() throws IOException {
+        this.accessToken = Utils.readFileAsString(accessTokenFile.toFile().getPath());
+    }
+
+    @Override
+    public String retrieve() throws IOException {
+        if (accessToken == null)
+            throw new IllegalStateException("Access token is null; please call init() first");
+
+        return accessToken;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
new file mode 100644
index 0000000..b52952a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>HttpAccessTokenRetriever</code> is an {@link AccessTokenRetriever} that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+    private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+    private static final Set<Integer> UNRETRYABLE_HTTP_CODES;
+
+    private static final int MAX_RESPONSE_BODY_LENGTH = 1000;
+
+    public static final String AUTHORIZATION_HEADER = "Authorization";
+
+    static {
+        // This does not have to be an exhaustive list. There are other HTTP codes that
+        // are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585)
+        // that we won't worry about yet. The worst case if a status code is missing from
+        // this set is that the request will be retried.
+        UNRETRYABLE_HTTP_CODES = new HashSet<>();
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+    }
+
+    private final String clientId;
+
+    private final String clientSecret;
+
+    private final String scope;
+
+    private final SSLSocketFactory sslSocketFactory;
+
+    private final String tokenEndpointUrl;
+
+    private final long loginRetryBackoffMs;
+
+    private final long loginRetryBackoffMaxMs;
+
+    private final Integer loginConnectTimeoutMs;
+
+    private final Integer loginReadTimeoutMs;
+
+    public HttpAccessTokenRetriever(String clientId,
+        String clientSecret,
+        String scope,
+        SSLSocketFactory sslSocketFactory,
+        String tokenEndpointUrl,
+        long loginRetryBackoffMs,
+        long loginRetryBackoffMaxMs,
+        Integer loginConnectTimeoutMs,
+        Integer loginReadTimeoutMs) {
+        this.clientId = Objects.requireNonNull(clientId);
+        this.clientSecret = Objects.requireNonNull(clientSecret);
+        this.scope = scope;
+        this.sslSocketFactory = sslSocketFactory;
+        this.tokenEndpointUrl = Objects.requireNonNull(tokenEndpointUrl);
+        this.loginRetryBackoffMs = loginRetryBackoffMs;
+        this.loginRetryBackoffMaxMs = loginRetryBackoffMaxMs;
+        this.loginConnectTimeoutMs = loginConnectTimeoutMs;
+        this.loginReadTimeoutMs = loginReadTimeoutMs;
+    }
+
+    /**
+     * Retrieves a JWT access token in its serialized three-part form. The implementation
+     * is free to determine how it should be retrieved but should not perform validation
+     * on the result.
+     *
+     * <b>Note</b>: This is a blocking function and callers should be aware that the
+     * implementation communicates over a network. The facility in the
+     * {@link javax.security.auth.spi.LoginModule} from which this is ultimately called
+     * does not provide an asynchronous approach.
+     *
+     * @return Non-<code>null</code> JWT access token string
+     *
+     * @throws IOException Thrown on errors related to IO during retrieval
+     */
+
+    @Override
+    public String retrieve() throws IOException {
+        String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
+        String requestBody = formatRequestBody(scope);
+        Retry<String> retry = new Retry<>(loginRetryBackoffMs, loginRetryBackoffMaxMs);
+        Map<String, String> headers = Collections.singletonMap(AUTHORIZATION_HEADER, authorizationHeader);
+
+        String responseBody;
+
+        try {
+            responseBody = retry.execute(() -> {
+                HttpURLConnection con = null;
+
+                try {
+                    con = (HttpURLConnection) new URL(tokenEndpointUrl).openConnection();
+
+                    if (sslSocketFactory != null && con instanceof HttpsURLConnection)
+                        ((HttpsURLConnection) con).setSSLSocketFactory(sslSocketFactory);
+
+                    return post(con, headers, requestBody, loginConnectTimeoutMs, loginReadTimeoutMs);
+                } catch (IOException e) {
+                    throw new ExecutionException(e);
+                } finally {
+                    if (con != null)
+                        con.disconnect();
+                }
+            });
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException)
+                throw (IOException) e.getCause();
+            else
+                throw new KafkaException(e.getCause());
+        }
+
+        return parseAccessToken(responseBody);
+    }
+
+    public static String post(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        handleInput(con, headers, requestBody, connectTimeoutMs, readTimeoutMs);
+        return handleOutput(con);
+    }
+
+    private static void handleInput(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        log.debug("handleInput - starting post for {}", con.getURL());
+        con.setRequestMethod("POST");
+        con.setRequestProperty("Accept", "application/json");
+
+        if (headers != null) {
+            for (Map.Entry<String, String> header : headers.entrySet())
+                con.setRequestProperty(header.getKey(), header.getValue());
+        }
+
+        con.setRequestProperty("Cache-Control", "no-cache");
+
+        if (requestBody != null) {
+            con.setRequestProperty("Content-Length", String.valueOf(requestBody.length()));
+            con.setDoOutput(true);
+        }
+
+        con.setUseCaches(false);
+
+        if (connectTimeoutMs != null)
+            con.setConnectTimeout(connectTimeoutMs);
+
+        if (readTimeoutMs != null)
+            con.setReadTimeout(readTimeoutMs);
+
+        log.debug("handleInput - preparing to connect to {}", con.getURL());
+        con.connect();
+
+        if (requestBody != null) {
+            try (OutputStream os = con.getOutputStream()) {
+                ByteArrayInputStream is = new ByteArrayInputStream(requestBody.getBytes(StandardCharsets.UTF_8));
+                log.debug("handleInput - preparing to write request body to {}", con.getURL());
+                copy(is, os);
+            }
+        }
+    }
+
+    static String handleOutput(final HttpURLConnection con) throws IOException {
+        int responseCode = con.getResponseCode();
+        log.debug("handleOutput - responseCode: {}", responseCode);
+
+        String responseBody = null;
+
+        try (InputStream is = con.getInputStream()) {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            log.debug("handleOutput - preparing to read response body from {}", con.getURL());
+            copy(is, os);
+            responseBody = os.toString(StandardCharsets.UTF_8.name());
+        } catch (Exception e) {
+            log.warn("handleOutput - error retrieving data", e);
+        }
+
+        if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+            log.debug("handleOutput - responseCode: {}, response: {}", responseCode, responseBody);
+
+            if (responseBody == null || responseBody.isEmpty())
+                throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %s from %s", responseCode, con.getURL()));
+
+            return responseBody;
+        } else {
+            log.warn("handleOutput - error response code: {}, error response body: {}", responseCode, responseBody);
+
+            if (UNRETRYABLE_HTTP_CODES.contains(responseCode)) {
+                // We know that this is a non-transient error, so let's not keep retrying the
+                // request unnecessarily.
+                throw new UnretryableException(new IOException(String.format("The response code %s was encountered reading the token endpoint response; will not attempt further retries", responseCode)));
+            } else {
+                // We don't know if this is a transient (retryable) error or not, so let's assume
+                // it is.
+                throw new IOException(String.format("The unexpected response code %s was encountered reading the token endpoint response", responseCode));
+            }
+        }
+    }
+
+    static void copy(InputStream is, OutputStream os) throws IOException {
+        byte[] buf = new byte[4096];
+        int b;
+
+        while ((b = is.read(buf)) != -1)
+            os.write(buf, 0, b);
+    }
+
+    static String parseAccessToken(String responseBody) throws IOException {
+        log.debug("parseAccessToken - responseBody: {}", responseBody);
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode rootNode = mapper.readTree(responseBody);
+        JsonNode accessTokenNode = rootNode.at("/access_token");
+
+        if (accessTokenNode == null) {
+            // Only grab the first N characters so that if the response body is huge, we don't
+            // blow up.
+            String snippet = responseBody;
+
+            if (snippet.length() > MAX_RESPONSE_BODY_LENGTH) {
+                int actualLength = responseBody.length();
+                String s = responseBody.substring(0, MAX_RESPONSE_BODY_LENGTH);
+                snippet = String.format("%s (trimmed to first %s characters out of %s total)", s, MAX_RESPONSE_BODY_LENGTH, actualLength);
+            }
+
+            throw new IOException(String.format("The token endpoint response did not contain an access_token value. Response: (%s)", snippet));
+        }
+
+        return sanitizeString("the token endpoint response's access_token JSON attribute", accessTokenNode.textValue());
+    }
+
+    static String formatAuthorizationHeader(String clientId, String clientSecret) {
+        clientId = sanitizeString("the token endpoint request client ID parameter", clientId);
+        clientSecret = sanitizeString("the token endpoint request client secret parameter", clientSecret);
+
+        String s = String.format("%s:%s", clientId, clientSecret);
+        String encoded = Base64.getUrlEncoder().encodeToString(Utils.utf8(s));
+        return String.format("Basic %s", encoded);
+    }
+
+    static String formatRequestBody(String scope) throws IOException {
+        try {
+            StringBuilder requestParameters = new StringBuilder();
+            requestParameters.append("grant_type=client_credentials");
+
+            if (scope != null && !scope.trim().isEmpty()) {
+                scope = scope.trim();
+                String encodedScope = URLEncoder.encode(scope, StandardCharsets.UTF_8.name());
+                requestParameters.append("&scope=").append(encodedScope);
+            }
+
+            return requestParameters.toString();
+        } catch (UnsupportedEncodingException e) {
+            // The world has gone crazy!
+            throw new IOException(String.format("Encoding %s not supported", StandardCharsets.UTF_8.name()));
+        }
+    }
+
+    private static String sanitizeString(String name, String value) {
+        if (value == null)
+            throw new IllegalArgumentException(String.format("The value for %s must be non-null", name));
+
+        if (value.isEmpty())
+            throw new IllegalArgumentException(String.format("The value for %s must be non-empty", name));
+
+        value = value.trim();
+
+        if (value.isEmpty())
+            throw new IllegalArgumentException(String.format("The value for %s must not contain only whitespace", name));
+
+        return value;
+    }
+
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Initable.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Initable.java
new file mode 100644
index 0000000..bf4115e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Initable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+
+public interface Initable {
+
+    /**
+     * Lifecycle method to perform any one-time initialization of the retriever. This must
+     * be performed by the caller to ensure the correct state before methods are invoked.
+     *
+     * @throws IOException Thrown on errors related to IO during initialization
+     */
+
+    default void init() throws IOException {
+        // This method left intentionally blank.
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JaasOptionsUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JaasOptionsUtils.java
new file mode 100644
index 0000000..e728881
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JaasOptionsUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.net.ssl.SSLSocketFactory;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>JaasOptionsUtils</code> is a utility class to perform logic for the JAAS options and
+ * is separated out here for easier, more direct testing.
+ */
+
+public class JaasOptionsUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(JaasOptionsUtils.class);
+
+    private final Map<String, Object> options;
+
+    public JaasOptionsUtils(Map<String, Object> options) {
+        this.options = options;
+    }
+
+    public static Map<String, Object> getOptions(String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size()));
+
+        return Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+    }
+
+    public boolean shouldCreateSSLSocketFactory(URL url) {
+        return url.getProtocol().equalsIgnoreCase("https");
+    }
+
+    public Map<String, ?> getSslClientConfig() {
+        ConfigDef sslConfigDef = new ConfigDef();
+        sslConfigDef.withClientSslSupport();
+        AbstractConfig sslClientConfig = new AbstractConfig(sslConfigDef, options);
+        return sslClientConfig.values();
+    }
+
+    public SSLSocketFactory createSSLSocketFactory() {
+        Map<String, ?> sslClientConfig = getSslClientConfig();
+        SslFactory sslFactory = new SslFactory(Mode.CLIENT);
+        sslFactory.configure(sslClientConfig);
+        SSLSocketFactory socketFactory = ((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext().getSocketFactory();
+        log.debug("Created SSLSocketFactory: {}", sslClientConfig);
+        return socketFactory;
+    }
+
+    public String validateString(String name) throws ValidateException {
+        return validateString(name, true);
+    }
+
+    public String validateString(String name, boolean isRequired) throws ValidateException {
+        String value = (String) options.get(name);
+
+        if (value == null) {
+            if (isRequired)
+                throw new ConfigException(String.format("The OAuth configuration option %s value must be non-null", name));
+            else
+                return null;
+        }
+
+        value = value.trim();
+
+        if (value.isEmpty()) {
+            if (isRequired)
+                throw new ConfigException(String.format("The OAuth configuration option %s value must not contain only whitespace", name));
+            else
+                return null;
+        }
+
+        return value;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JwksFileVerificationKeyResolver.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JwksFileVerificationKeyResolver.java
new file mode 100644
index 0000000..19ed749
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JwksFileVerificationKeyResolver.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.security.Key;
+import java.util.List;
+import org.apache.kafka.common.utils.Utils;
+import org.jose4j.jwk.JsonWebKeySet;
+import org.jose4j.jws.JsonWebSignature;
+import org.jose4j.jwx.JsonWebStructure;
+import org.jose4j.keys.resolvers.JwksVerificationKeyResolver;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.jose4j.lang.JoseException;
+import org.jose4j.lang.UnresolvableKeyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>JwksFileVerificationKeyResolver</code> is a {@link VerificationKeyResolver} implementation
+ * that will load the JWKS from the given file system directory.
+ *
+ * A <a href="https://datatracker.ietf.org/doc/html/rfc7517#section-5">JWKS (JSON Web Key Set)</a>
+ * is a JSON document provided by the OAuth/OIDC provider that lists the keys used to sign the JWTs
+ * it issues.
+ *
+ * Here is a sample JWKS JSON document:
+ *
+ * <pre>
+ * {
+ *   "keys": [
+ *     {
+ *       "kty": "RSA",
+ *       "alg": "RS256",
+ *       "kid": "abc123",
+ *       "use": "sig",
+ *       "e": "AQAB",
+ *       "n": "..."
+ *     },
+ *     {
+ *       "kty": "RSA",
+ *       "alg": "RS256",
+ *       "kid": "def456",
+ *       "use": "sig",
+ *       "e": "AQAB",
+ *       "n": "..."
+ *     }
+ *   ]
+ * }
+ * </pre>
+ *
+ * Without going into too much detail, the array of keys enumerates the key data that the provider
+ * is using to sign the JWT. The key ID (<code>kid</code>) is referenced by the JWT's header in
+ * order to match up the JWT's signing key with the key in the JWKS. During the validation step of
+ * the broker, the jose4j OAuth library will use the contents of the appropriate key in the JWKS
+ * to validate the signature.
+ *
+ * Given that the JWKS is referenced by the JWT, the JWKS must be made available by the
+ * OAuth/OIDC provider so that a JWT can be validated.
+ *
+ * @see org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL
+ * @see VerificationKeyResolver
+ */
+
+public class JwksFileVerificationKeyResolver implements CloseableVerificationKeyResolver {
+
+    private static final Logger log = LoggerFactory.getLogger(JwksFileVerificationKeyResolver.class);
+
+    private final Path jwksFile;
+
+    private VerificationKeyResolver delegate;
+
+    public JwksFileVerificationKeyResolver(Path jwksFile) {
+        this.jwksFile = jwksFile;
+    }
+
+    @Override
+    public void init() throws IOException {
+        log.debug("Starting creation of new VerificationKeyResolver from {}", jwksFile);
+        String json = Utils.readFileAsString(jwksFile.toFile().getPath());
+
+        JsonWebKeySet jwks;
+
+        try {
+            jwks = new JsonWebKeySet(json);
+        } catch (JoseException e) {
+            throw new IOException(e);
+        }
+
+        delegate = new JwksVerificationKeyResolver(jwks.getJsonWebKeys());
+    }
+
+    @Override
+    public Key resolveKey(JsonWebSignature jws, List<JsonWebStructure> nestingContext) throws UnresolvableKeyException {
+        if (delegate == null)
+            throw new UnresolvableKeyException("VerificationKeyResolver delegate is null; please call init() first");
+
+        return delegate.resolveKey(jws, nestingContext);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidator.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidator.java
new file mode 100644
index 0000000..5816055
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidator.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
+import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException;
+import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LoginAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used
+ * by the client to perform some rudimentary validation of the JWT access token that is received
+ * as part of the response from posting the client credentials to the OAuth/OIDC provider's
+ * token endpoint.
+ *
+ * The validation steps performed are:
+ *
+ * <ol>
+ *     <li>
+ *         Basic structural validation of the <code>b64token</code> value as defined in
+ *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a>
+ *     </li>
+ *     <li>Basic conversion of the token into an in-memory map</li>
+ *     <li>Presence of scope, <code>exp</code>, subject, and <code>iat</code> claims</li>
+ * </ol>
+ */
+
+public class LoginAccessTokenValidator implements AccessTokenValidator {
+
+    private static final Logger log = LoggerFactory.getLogger(LoginAccessTokenValidator.class);
+
+    public static final String EXPIRATION_CLAIM_NAME = "exp";
+
+    public static final String ISSUED_AT_CLAIM_NAME = "iat";
+
+    private final String scopeClaimName;
+
+    private final String subClaimName;
+
+    /**
+     * Creates a new LoginAccessTokenValidator that will be used by the client for lightweight
+     * validation of the JWT.
+     *
+     * @param scopeClaimName Name of the scope claim to use; must be non-<code>null</code>
+     * @param subClaimName   Name of the subject claim to use; must be non-<code>null</code>
+     */
+
+    public LoginAccessTokenValidator(String scopeClaimName, String subClaimName) {
+        this.scopeClaimName = ClaimValidationUtils.validateClaimNameOverride(DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, scopeClaimName);
+        this.subClaimName = ClaimValidationUtils.validateClaimNameOverride(DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME, subClaimName);
+    }
+
+    /**
+     * Accepts an OAuth JWT access token in base-64 encoded format, validates, and returns an
+     * OAuthBearerToken.
+     *
+     * @param accessToken Non-<code>null</code> JWT access token
+     * @return {@link OAuthBearerToken}
+     * @throws ValidateException Thrown on errors performing validation of given token
+     */
+
+    @SuppressWarnings("unchecked")
+    public OAuthBearerToken validate(String accessToken) throws ValidateException {
+        log.debug("validate - accessToken: {}", accessToken);
+        SerializedJwt serializedJwt = new SerializedJwt(accessToken);
+        Map<String, Object> payload;
+
+        try {
+            payload = OAuthBearerUnsecuredJws.toMap(serializedJwt.getPayload());
+        } catch (OAuthBearerIllegalTokenException e) {
+            throw new ValidateException(String.format("Could not validate the access token: %s", e.getMessage()), e);
+        }
+
+        Object scopeRaw = getClaim(payload, scopeClaimName);
+        Collection<String> scopeRawCollection;
+
+        if (scopeRaw instanceof String)
+            scopeRawCollection = Collections.singletonList((String) scopeRaw);
+        else if (scopeRaw instanceof Collection)
+            scopeRawCollection = (Collection<String>) scopeRaw;
+        else
+            scopeRawCollection = Collections.emptySet();
+
+        Number expirationRaw = (Number) getClaim(payload, EXPIRATION_CLAIM_NAME);
+        String subRaw = (String) getClaim(payload, subClaimName);
+        Number issuedAtRaw = (Number) getClaim(payload, ISSUED_AT_CLAIM_NAME);
+
+        Set<String> scopes = ClaimValidationUtils.validateScopes(scopeClaimName, scopeRawCollection);
+        long expiration = ClaimValidationUtils.validateExpiration(EXPIRATION_CLAIM_NAME,
+            expirationRaw != null ? expirationRaw.longValue() * 1000L : null);
+        String subject = ClaimValidationUtils.validateSubject(subClaimName, subRaw);
+        Long issuedAt = ClaimValidationUtils.validateIssuedAt(ISSUED_AT_CLAIM_NAME,
+            issuedAtRaw != null ? issuedAtRaw.longValue() * 1000L : null);
+
+        OAuthBearerToken token = new BasicOAuthBearerToken(accessToken,
+            scopes,
+            expiration,
+            subject,
+            issuedAt);
+
+        log.debug("validate - token: {}", token);
+
+        return token;
+    }
+
+    private Object getClaim(Map<String, Object> payload, String claimName) {
+        Object value = payload.get(claimName);
+        log.debug("getClaim - {}: {}", claimName, value);
+        return value;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
new file mode 100644
index 0000000..c8dcfc9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * <code>OAuthBearerLoginCallbackHandler</code> is an {@link AuthenticateCallbackHandler} that
+ * accepts {@link OAuthBearerTokenCallback} and {@link SaslExtensionsCallback} callbacks to
+ * perform the steps to request a JWT from an OAuth/OIDC provider using the
+ * <code>clientcredentials</code>. This grant type is commonly used for non-interactive
+ * "service accounts" where there is no user available to interactively supply credentials.
+ * </p>
+ *
+ * <p>
+ * The <code>OAuthBearerLoginCallbackHandler</code> is used on the client side to retrieve a JWT
+ * and the {@link OAuthBearerValidatorCallbackHandler} is used on the broker to validate the JWT
+ * that was sent to it by the client to allow access. Both the brokers and clients will need to
+ * be configured with their appropriate callback handlers and respective configuration for OAuth
+ * functionality to work.
+ * </p>
+ *
+ * <p>
+ * Note that while this callback handler class must be specified for a Kafka client that wants to
+ * use OAuth functionality, in the case of OAuth-based inter-broker communication, the callback
+ * handler must be used on the Kafka broker side as well.
+ * {@link }
+ * </p>
+ *
+ * <p>
+ * This {@link AuthenticateCallbackHandler} is enabled by specifying its class name in the Kafka
+ * configuration. For client use, specify the class name in the
+ * {@link org.apache.kafka.common.config.SaslConfigs#SASL_LOGIN_CALLBACK_HANDLER_CLASS}
+ * configuration like so:
+ *
+ * <code>
+ * sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
+ * </code>
+ * </p>
+ *
+ * <p>
+ * If using OAuth login on the broker side (for inter-broker communication), the callback handler
+ * class will be specified with a listener-based property:
+ * <code>listener.name.<listener name>.oauthbearer.sasl.login.callback.handler.class</code> like so:
+ *
+ * <code>
+ * listener.name.<listener name>.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
+ * </code>
+ * </p>
+ *
+ * <p>
+ * The Kafka configuration must also include JAAS configuration which includes the following
+ * OAuth-specific options:
+ *
+ * <ul>
+ *     <li><code>clientId</code>OAuth client ID (required)</li>
+ *     <li><code>clientSecret</code>OAuth client secret (required)</li>
+ *     <li><code>scope</code>OAuth scope (optional)</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * The JAAS configuration can also include any SSL options that are needed. The configuration
+ * options are the same as those specified by the configuration in
+ * {@link org.apache.kafka.common.config.SslConfigs#addClientSslSupport(ConfigDef)}.
+ * </p>
+ *
+ * <p>
+ * Here's an example of the JAAS configuration for a Kafka client:
+ *
+ * <code>
+ * sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+ *   clientId="foo" \
+ *   clientSecret="bar" \
+ *   scope="baz" \
+ *   ssl.protocol="SSL" ;
+ * </code>
+ * </p>
+ *
+ * <p>
+ * The configuration option
+ * {@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}
+ * is also required in order for the client to contact the OAuth/OIDC provider. For example:
+ *
+ * <code>
+ * sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+ * </code>
+ *
+ * Please see the OAuth/OIDC providers documentation for the token endpoint URL.
+ * </p>
+ *
+ * <p>
+ * The following is a list of all the configuration options that are available for the login
+ * callback handler:
+ *
+ * <ul>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_LOGIN_CALLBACK_HANDLER_CLASS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_LOGIN_CONNECT_TIMEOUT_MS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_LOGIN_READ_TIMEOUT_MS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_LOGIN_RETRY_BACKOFF_MS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_LOGIN_RETRY_BACKOFF_MAX_MS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_JAAS_CONFIG}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_SCOPE_CLAIM_NAME}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_SUB_CLAIM_NAME}</li>
+ * </ul>
+ * </p>
+ */
+
+public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+    public static final String CLIENT_ID_CONFIG = "clientId";
+    public static final String CLIENT_SECRET_CONFIG = "clientSecret";
+    public static final String SCOPE_CONFIG = "scope";
+
+    public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " +
+        "client ID to uniquely identify the service account to use for authentication for " +
+        "this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " +
+        "value and is provided to the OAuth provider using the OAuth " +
+        "clientcredentials grant type.";
+
+    public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " +
+        "client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " +
+        "account and identifies the service account to use for authentication for " +
+        "this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " +
+        "value and is provided to the OAuth provider using the OAuth " +
+        "clientcredentials grant type.";
+
+    public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " +
+        "token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " +
+        "OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " +
+        "include with the login request.";
+
+    private static final String EXTENSION_PREFIX = "extension_";
+
+    private Map<String, Object> moduleOptions;
+
+    private AccessTokenRetriever accessTokenRetriever;
+
+    private AccessTokenValidator accessTokenValidator;
+
+    private boolean isInitialized = false;
+
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
+        AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, saslMechanism, moduleOptions);
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism);
+        init(accessTokenRetriever, accessTokenValidator);
+    }
+
+    /*
+     * Package-visible for testing.
+     */
+
+    void init(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator accessTokenValidator) {
+        this.accessTokenRetriever = accessTokenRetriever;
+        this.accessTokenValidator = accessTokenValidator;
+
+        try {
+            this.accessTokenRetriever.init();
+        } catch (IOException e) {
+            throw new KafkaException("The OAuth login configuration encountered an error when initializing the AccessTokenRetriever", e);
+        }
+
+        isInitialized = true;
+    }
+
+    /*
+     * Package-visible for testing.
+     */
+
+    AccessTokenRetriever getAccessTokenRetriever() {
+        return accessTokenRetriever;
+    }
+
+    @Override
+    public void close() {
+        if (accessTokenRetriever != null) {
+            try {
+                this.accessTokenRetriever.close();
+            } catch (IOException e) {
+                log.warn("The OAuth login configuration encountered an error when closing the AccessTokenRetriever", e);
+            }
+        }
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        checkInitialized();
+
+        for (Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerTokenCallback) {
+                handleTokenCallback((OAuthBearerTokenCallback) callback);
+            } else if (callback instanceof SaslExtensionsCallback) {
+                handleExtensionsCallback((SaslExtensionsCallback) callback);
+            } else {
+                throw new UnsupportedCallbackException(callback);
+            }
+        }
+    }
+
+    private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
+        checkInitialized();
+
+        String accessToken = accessTokenRetriever.retrieve();
+        log.debug("handle - accessToken: {}", accessToken);
+
+        try {
+            OAuthBearerToken token = accessTokenValidator.validate(accessToken);
+            log.debug("handle - token: {}", token);
+            callback.token(token);
+        } catch (ValidateException e) {
+            log.warn(e.getMessage(), e);
+            callback.error("invalid_token", e.getMessage(), null);
+        }
+    }
+
+    private void handleExtensionsCallback(SaslExtensionsCallback callback) {
+        checkInitialized();
+
+        Map<String, String> extensions = new HashMap<>();
+
+        for (Map.Entry<String, Object> configEntry : this.moduleOptions.entrySet()) {
+            String key = configEntry.getKey();
+
+            if (!key.startsWith(EXTENSION_PREFIX))
+                continue;
+
+            Object valueRaw = configEntry.getValue();
+            String value;
+
+            if (valueRaw instanceof String)
+                value = (String) valueRaw;
+            else
+                value = String.valueOf(valueRaw);
+
+            extensions.put(key.substring(EXTENSION_PREFIX.length()), value);
+        }
+
+        SaslExtensions saslExtensions = new SaslExtensions(extensions);
+
+        try {
+            OAuthBearerClientInitialResponse.validateExtensions(saslExtensions);
+        } catch (SaslException e) {
+            throw new ConfigException(e.getMessage());
+        }
+
+        callback.extensions(saslExtensions);
+    }
+
+    private void checkInitialized() {
+        if (!isInitialized)
+            throw new IllegalStateException(String.format("To use %s, first call the configure or init method", getClass().getSimpleName()));
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
new file mode 100644
index 0000000..e2c0a2e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.security.Key;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.jose4j.jws.JsonWebSignature;
+import org.jose4j.jwx.JsonWebStructure;
+import org.jose4j.lang.UnresolvableKeyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * <code>OAuthBearerValidatorCallbackHandler</code> is an {@link AuthenticateCallbackHandler} that
+ * accepts {@link OAuthBearerValidatorCallback} and {@link OAuthBearerExtensionsValidatorCallback}
+ * callbacks to implement OAuth/OIDC validation. This callback handler is intended only to be used
+ * on the Kafka broker side as it will receive a {@link OAuthBearerValidatorCallback} that includes
+ * the JWT provided by the Kafka client. That JWT is validated in terms of format, expiration,
+ * signature, and audience and issuer (if desired). This callback handler is the broker side of the
+ * OAuth functionality, whereas {@link OAuthBearerLoginCallbackHandler} is used by clients.
+ * </p>
+ *
+ * <p>
+ * This {@link AuthenticateCallbackHandler} is enabled in the broker configuration by setting the
+ * {@link org.apache.kafka.common.config.internals.BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS}
+ * like so:
+ *
+ * <code>
+ * listener.name.<listener name>.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
+ * </code>
+ * </p>
+ *
+ * <p>
+ * The JAAS configuration for OAuth is also needed. If using OAuth for inter-broker communication,
+ * the options are those specified in {@link OAuthBearerLoginCallbackHandler}.
+ * </p>
+ *
+ * <p>
+ * The configuration option
+ * {@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_JWKS_ENDPOINT_URL}
+ * is also required in order to contact the OAuth/OIDC provider to retrieve the JWKS for use in
+ * JWT signature validation. For example:
+ *
+ * <code>
+ * listener.name.<listener name>.oauthbearer.sasl.oauthbearer.jwks.endpoint.url=https://example.com/oauth2/v1/keys
+ * </code>
+ *
+ * Please see the OAuth/OIDC providers documentation for the JWKS endpoint URL.
+ * </p>
+ *
+ * <p>
+ * The following is a list of all the configuration options that are available for the broker
+ * validation callback handler:
+ *
+ * <ul>
+ *   <li>{@link org.apache.kafka.common.config.internals.BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_JAAS_CONFIG}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_EXPECTED_AUDIENCE}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_EXPECTED_ISSUER}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_JWKS_ENDPOINT_URL}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_SCOPE_CLAIM_NAME}</li>
+ *   <li>{@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_SUB_CLAIM_NAME}</li>
+ * </ul>
+ * </p>
+ */
+
+public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+    /**
+     * Because a {@link CloseableVerificationKeyResolver} instance can spawn threads and issue
+     * HTTP(S) calls ({@link RefreshingHttpsJwksVerificationKeyResolver}), we only want to create
+     * a new instance for each particular set of configuration. Because each set of configuration
+     * may have multiple instances, we want to reuse the single instance.
+     */
+
+    private static final Map<VerificationKeyResolverKey, CloseableVerificationKeyResolver> VERIFICATION_KEY_RESOLVER_CACHE = new HashMap<>();
+
+    private CloseableVerificationKeyResolver verificationKeyResolver;
+
+    private AccessTokenValidator accessTokenValidator;
+
+    private boolean isInitialized = false;
+
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        Map<String, Object> moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
+        CloseableVerificationKeyResolver verificationKeyResolver;
+
+        // Here's the logic which keeps our VerificationKeyResolvers down to a single instance.
+        synchronized (VERIFICATION_KEY_RESOLVER_CACHE) {
+            VerificationKeyResolverKey key = new VerificationKeyResolverKey(configs, moduleOptions);
+            verificationKeyResolver = VERIFICATION_KEY_RESOLVER_CACHE.computeIfAbsent(key, k ->
+                new RefCountingVerificationKeyResolver(VerificationKeyResolverFactory.create(configs, saslMechanism, moduleOptions)));
+        }
+
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver);
+        init(verificationKeyResolver, accessTokenValidator);
+    }
+
+    /*
+     * Package-visible for testing.
+     */
+
+    void init(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) {
+        this.verificationKeyResolver = verificationKeyResolver;
+        this.accessTokenValidator = accessTokenValidator;
+
+        try {
+            verificationKeyResolver.init();
+        } catch (Exception e) {
+            throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e);
+        }
+
+        isInitialized = true;
+    }
+
+    @Override
+    public void close() {
+        if (verificationKeyResolver != null) {
+            try {
+                verificationKeyResolver.close();
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        checkInitialized();
+
+        for (Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerValidatorCallback) {
+                handleValidatorCallback((OAuthBearerValidatorCallback) callback);
+            } else if (callback instanceof OAuthBearerExtensionsValidatorCallback) {
+                handleExtensionsValidatorCallback((OAuthBearerExtensionsValidatorCallback) callback);
+            } else {
+                throw new UnsupportedCallbackException(callback);
+            }
+        }
+    }
+
+    private void handleValidatorCallback(OAuthBearerValidatorCallback callback) {
+        checkInitialized();
+
+        OAuthBearerToken token;
+
+        try {
+            token = accessTokenValidator.validate(callback.tokenValue());
+            log.debug("handle - token: {}", token);
+            callback.token(token);
+        } catch (ValidateException e) {
+            log.warn(e.getMessage(), e);
+            callback.error("invalid_token", null, null);
+        }
+    }
+
+    private void handleExtensionsValidatorCallback(OAuthBearerExtensionsValidatorCallback extensionsValidatorCallback) {
+        checkInitialized();
+
+        extensionsValidatorCallback.inputExtensions().map().forEach((extensionName, v) -> extensionsValidatorCallback.valid(extensionName));
+    }
+
+    private void checkInitialized() {
+        if (!isInitialized)
+            throw new IllegalStateException(String.format("To use %s, first call the configure or init method", getClass().getSimpleName()));
+    }
+
+    /**
+     * <code>VkrKey</code> is a simple structure which encapsulates the criteria for different
+     * sets of configuration. This will allow us to use this object as a key in a {@link Map}
+     * to keep a single instance per key.
+     */
+
+    private static class VerificationKeyResolverKey {
+
+        private final Map<String, ?> configs;
+
+        private final Map<String, Object> moduleOptions;
+
+        public VerificationKeyResolverKey(Map<String, ?> configs, Map<String, Object> moduleOptions) {
+            this.configs = configs;
+            this.moduleOptions = moduleOptions;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            VerificationKeyResolverKey that = (VerificationKeyResolverKey) o;
+            return configs.equals(that.configs) && moduleOptions.equals(that.moduleOptions);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(configs, moduleOptions);
+        }
+
+    }
+
+    /**
+     * <code>RefCountingVerificationKeyResolver</code> allows us to share a single
+     * {@link CloseableVerificationKeyResolver} instance between multiple
+     * {@link AuthenticateCallbackHandler} instances and perform the lifecycle methods the
+     * appropriate number of times.
+     */
+
+    private static class RefCountingVerificationKeyResolver implements CloseableVerificationKeyResolver {
+
+        private final CloseableVerificationKeyResolver delegate;
+
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public RefCountingVerificationKeyResolver(CloseableVerificationKeyResolver delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Key resolveKey(JsonWebSignature jws, List<JsonWebStructure> nestingContext) throws UnresolvableKeyException {
+            return delegate.resolveKey(jws, nestingContext);
+        }
+
+        @Override
+        public void init() throws IOException {
+            if (count.incrementAndGet() == 1)
+                delegate.init();
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (count.decrementAndGet() == 0)
+                delegate.close();
+        }
+
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java
new file mode 100644
index 0000000..4003a44
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.kafka.common.utils.Time;
+import org.jose4j.jwk.HttpsJwks;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.lang.JoseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or
+ * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's
+ * possible to receive a JWT that contains a <code>kid</code> that points to yet-unknown JWK,
+ * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice,
+ * keys are made available for some amount of time before they're used within JWTs.
+ *
+ * This instance is created and provided to the
+ * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using
+ * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then
+ * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of
+ * a JWT.
+ *
+ * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver
+ * @see org.jose4j.keys.resolvers.VerificationKeyResolver
+ * @see ValidatorAccessTokenValidator
+ */
+
+public final class RefreshingHttpsJwks implements Initable, Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class);
+
+    private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16;
+
+    static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 60000;
+
+    static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000;
+
+    private static final int SHUTDOWN_TIMEOUT = 10;
+
+    private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS;
+
+    /**
+     * {@link HttpsJwks} does the actual work of contacting the OAuth/OIDC endpoint to get the
+     * JWKS. In some cases, the call to {@link HttpsJwks#getJsonWebKeys()} will trigger a call
+     * to {@link HttpsJwks#refresh()} which will block the current thread in network I/O. We cache
+     * the JWKS ourselves (see {@link #jsonWebKeys}) to avoid the network I/O.
+     *
+     * We want to be very careful where we use the {@link HttpsJwks} instance so that we don't
+     * perform any operation (directly or indirectly) that could cause blocking. This is because
+     * the JWKS logic is part of the larger authentication logic which operates on Kafka's network
+     * thread. It's OK to execute {@link HttpsJwks#getJsonWebKeys()} (which calls
+     * {@link HttpsJwks#refresh()}) from within {@link #init()} as that method is called only at
+     * startup, and we can afford the blocking hit there.
+     */
+
+    private final HttpsJwks httpsJwks;
+
+    private final ScheduledExecutorService executorService;
+
+    private final Time time;
+
+    private final long refreshMs;
+
+    private final long refreshRetryBackoffMs;
+
+    private final long refreshRetryBackoffMaxMs;
+
+    /**
+     * Protects {@link #missingKeyIds} and {@link #jsonWebKeys}.
+     */
+
+    private final ReadWriteLock refreshLock = new ReentrantReadWriteLock();
+
+    private final Map<String, Long> missingKeyIds;
+
+    /**
+     * Flag to prevent concurrent refresh invocations.
+     */
+
+    private final AtomicBoolean refreshInProgressFlag = new AtomicBoolean(false);
+
+    /**
+     * As mentioned in the comments for {@link #httpsJwks}, we cache the JWKS ourselves so that
+     * we can return the list immediately without any network I/O. They are only cached within
+     * calls to {@link #refresh()}.
+     */
+
+    private List<JsonWebKey> jsonWebKeys;
+
+    private boolean isInitialized;
+
+    /**
+     * Creates a <code>RefreshingHttpsJwks</code> that will be used by the
+     * {@link RefreshingHttpsJwksVerificationKeyResolver} to resolve new key IDs in JWTs.
+     *
+     * @param time                     {@link Time} instance
+     * @param httpsJwks                {@link HttpsJwks} instance from which to retrieve the JWKS
+     *                                 based on the OAuth/OIDC standard
+     * @param refreshMs                The number of milliseconds between refresh passes to connect
+     *                                 to the OAuth/OIDC JWKS endpoint to retrieve the latest set
+     * @param refreshRetryBackoffMs    Time for delay after initial failed attempt to retrieve JWKS
+     * @param refreshRetryBackoffMaxMs Maximum time to retrieve JWKS
+     */
+
+    public RefreshingHttpsJwks(Time time,
+        HttpsJwks httpsJwks,
+        long refreshMs,
+        long refreshRetryBackoffMs,
+        long refreshRetryBackoffMaxMs) {
+        if (refreshMs <= 0)
+            throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive");
+
+        this.httpsJwks = httpsJwks;
+        this.time = time;
+        this.refreshMs = refreshMs;
+        this.refreshRetryBackoffMs = refreshRetryBackoffMs;
+        this.refreshRetryBackoffMaxMs = refreshRetryBackoffMaxMs;
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.missingKeyIds = new LinkedHashMap<String, Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
+                return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES;
+            }
+        };
+    }
+
+    @Override
+    public void init() throws IOException {
+        try {
+            log.debug("init started");
+
+            List<JsonWebKey> localJWKs;
+
+            try {
+                localJWKs = httpsJwks.getJsonWebKeys();
+            } catch (JoseException e) {
+                throw new IOException("Could not refresh JWKS", e);
+            }
+
+            try {
+                refreshLock.writeLock().lock();
+                jsonWebKeys = Collections.unmodifiableList(localJWKs);
+            } finally {
+                refreshLock.writeLock().unlock();
+            }
+
+            // Since we just grabbed the keys (which will have invoked a HttpsJwks.refresh()
+            // internally), we can delay our first invocation by refreshMs.
+            //
+            // Note: we refer to this as a _scheduled_ refresh.
+            executorService.scheduleAtFixedRate(this::refresh,
+                refreshMs,
+                refreshMs,
+                TimeUnit.MILLISECONDS);
+
+            log.info("JWKS validation key refresh thread started with a refresh interval of {} ms", refreshMs);
+        } finally {
+            isInitialized = true;
+
+            log.debug("init completed");
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            log.debug("close started");
+
+            try {
+                log.debug("JWKS validation key refresh thread shutting down");
+                executorService.shutdown();
+
+                if (!executorService.awaitTermination(SHUTDOWN_TIMEOUT, SHUTDOWN_TIME_UNIT)) {
+                    log.warn("JWKS validation key refresh thread termination did not end after {} {}",
+                        SHUTDOWN_TIMEOUT, SHUTDOWN_TIME_UNIT);
+                }
+            } catch (InterruptedException e) {
+                log.warn("JWKS validation key refresh thread error during close", e);
+            }
+        } finally {
+            log.debug("close completed");
+        }
+    }
+
+    /**
+     * Our implementation avoids the blocking call within {@link HttpsJwks#refresh()} that is
+     * sometimes called internal to {@link HttpsJwks#getJsonWebKeys()}. We want to avoid any
+     * blocking I/O as this code is running in the authentication path on the Kafka network thread.
+     *
+     * The list may be stale up to {@link #refreshMs}.
+     *
+     * @return {@link List} of {@link JsonWebKey} instances
+     *
+     * @throws JoseException Thrown if a problem is encountered parsing the JSON content into JWKs
+     * @throws IOException Thrown f a problem is encountered making the HTTP request
+     */
+
+    public List<JsonWebKey> getJsonWebKeys() throws JoseException, IOException {
+        if (!isInitialized)
+            throw new IllegalStateException("Please call init() first");
+
+        try {
+            refreshLock.readLock().lock();
+            return jsonWebKeys;
+        } finally {
+            refreshLock.readLock().unlock();
+        }
+    }
+
+    public String getLocation() {
+        return httpsJwks.getLocation();
+    }
+
+    /**
+     * <p>
+     * <code>refresh</code> is an internal method that will refresh the JWKS cache and is
+     * invoked in one of two ways:
+     *
+     * <ol>
+     *     <li>Scheduled</li>
+     *     <li>Expedited</li>
+     * </ol>
+     * </p>
+     *
+     * <p>
+     * The <i>scheduled</i> refresh is scheduled in {@link #init()} and runs every
+     * {@link #refreshMs} milliseconds. An <i>expedited</i> refresh is performed when an
+     * incoming JWT refers to a key ID that isn't in our JWKS cache ({@link #jsonWebKeys})
+     * and we try to perform a refresh sooner than the next scheduled refresh.
+     * </p>
+     */
+
+    private void refresh() {
+        if (!refreshInProgressFlag.compareAndSet(false, true)) {
+            log.debug("OAuth JWKS refresh is already in progress; ignoring concurrent refresh");
+            return;
+        }
+
+        try {
+            log.info("OAuth JWKS refresh of {} starting", httpsJwks.getLocation());
+            Retry<List<JsonWebKey>> retry = new Retry<>(refreshRetryBackoffMs, refreshRetryBackoffMaxMs);
+            List<JsonWebKey> localJWKs = retry.execute(() -> {
+                try {
+                    log.debug("JWKS validation key calling refresh of {} starting", httpsJwks.getLocation());
+                    // Call the *actual* refresh implementation that will more than likely issue
+                    // HTTP(S) calls over the network.
+                    httpsJwks.refresh();
+                    List<JsonWebKey> jwks = httpsJwks.getJsonWebKeys();
+                    log.debug("JWKS validation key refresh of {} complete", httpsJwks.getLocation());
+                    return jwks;
+                } catch (Exception e) {
+                    throw new ExecutionException(e);
+                }
+            });
+
+            try {
+                refreshLock.writeLock().lock();
+
+                for (JsonWebKey jwk : localJWKs)
+                    missingKeyIds.remove(jwk.getKeyId());
+
+                jsonWebKeys = Collections.unmodifiableList(localJWKs);
+            } finally {
+                refreshLock.writeLock().unlock();
+            }
+
+            log.info("OAuth JWKS refresh of {} complete", httpsJwks.getLocation());
+        } catch (ExecutionException e) {
+            log.warn("OAuth JWKS refresh of {} encountered an error; not updating local JWKS cache", httpsJwks.getLocation(), e);
+        } finally {
+            refreshInProgressFlag.set(false);
+        }
+    }
+
+    /**
+     * <p>
+     * <code>maybeExpediteRefresh</code> is a public method that will trigger a refresh of
+     * the JWKS cache if all of the following conditions are met:
+     *
+     * <ul>
+     *     <li>The given <code>keyId</code> parameter is &lte; the
+     *     {@link #MISSING_KEY_ID_MAX_KEY_LENGTH}</li>
+     *     <li>The key isn't in the process of being expedited already</li>
+     * </ul>
+     *
+     * <p>
+     * This <i>expedited</i> refresh is scheduled immediately.
+     * </p>
+     *
+     * @param keyId JWT key ID
+     * @return <code>true</code> if an expedited refresh was scheduled, <code>false</code> otherwise
+     */
+
+    public boolean maybeExpediteRefresh(String keyId) {
+        if (keyId.length() > MISSING_KEY_ID_MAX_KEY_LENGTH) {
+            // Although there's no limit on the length of the key ID, they're generally
+            // "reasonably" short. If we have a very long key ID length, we're going to assume
+            // the JWT is malformed, and we will not actually try to resolve the key.
+            //
+            // In this case, let's prevent blowing out our memory in two ways:
+            //
+            //     1. Don't try to resolve the key as the large ID will sit in our cache
+            //     2. Report the issue in the logs but include only the first N characters
+            int actualLength = keyId.length();
+            String s = keyId.substring(0, MISSING_KEY_ID_MAX_KEY_LENGTH);
+            String snippet = String.format("%s (trimmed to first %s characters out of %s total)", s, MISSING_KEY_ID_MAX_KEY_LENGTH, actualLength);
+            log.warn("Key ID {} was too long to cache", snippet);
+            return false;
+        } else {
+            try {
+                refreshLock.writeLock().lock();
+
+                Long nextCheckTime = missingKeyIds.get(keyId);
+                long currTime = time.milliseconds();
+                log.debug("For key ID {}, nextCheckTime: {}, currTime: {}", keyId, nextCheckTime, currTime);
+
+                if (nextCheckTime == null || nextCheckTime <= currTime) {
+                    // If there's no entry in the missing key ID cache for the incoming key ID,
+                    // or it has expired, schedule a refresh ASAP.
+                    nextCheckTime = currTime + MISSING_KEY_ID_CACHE_IN_FLIGHT_MS;
+                    missingKeyIds.put(keyId, nextCheckTime);
+                    executorService.schedule(this::refresh, 0, TimeUnit.MILLISECONDS);
+                    return true;
+                } else {
+                    return false;
+                }
+            } finally {
+                refreshLock.writeLock().unlock();
+            }
+        }
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwksVerificationKeyResolver.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwksVerificationKeyResolver.java
new file mode 100644
index 0000000..b496720
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwksVerificationKeyResolver.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.security.Key;
+import java.util.List;
+import org.jose4j.jwk.HttpsJwks;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.jwk.VerificationJwkSelector;
+import org.jose4j.jws.JsonWebSignature;
+import org.jose4j.jwx.JsonWebStructure;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.jose4j.lang.JoseException;
+import org.jose4j.lang.UnresolvableKeyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>RefreshingHttpsJwksVerificationKeyResolver</code> is a
+ * {@link VerificationKeyResolver} implementation that will periodically refresh the
+ * JWKS using its {@link HttpsJwks} instance.
+ *
+ * A <a href="https://datatracker.ietf.org/doc/html/rfc7517#section-5">JWKS (JSON Web Key Set)</a>
+ * is a JSON document provided by the OAuth/OIDC provider that lists the keys used to sign the JWTs
+ * it issues.
+ *
+ * Here is a sample JWKS JSON document:
+ *
+ * <pre>
+ * {
+ *   "keys": [
+ *     {
+ *       "kty": "RSA",
+ *       "alg": "RS256",
+ *       "kid": "abc123",
+ *       "use": "sig",
+ *       "e": "AQAB",
+ *       "n": "..."
+ *     },
+ *     {
+ *       "kty": "RSA",
+ *       "alg": "RS256",
+ *       "kid": "def456",
+ *       "use": "sig",
+ *       "e": "AQAB",
+ *       "n": "..."
+ *     }
+ *   ]
+ * }
+ * </pre>
+ *
+ * Without going into too much detail, the array of keys enumerates the key data that the provider
+ * is using to sign the JWT. The key ID (<code>kid</code>) is referenced by the JWT's header in
+ * order to match up the JWT's signing key with the key in the JWKS. During the validation step of
+ * the broker, the jose4j OAuth library will use the contents of the appropriate key in the JWKS
+ * to validate the signature.
+ *
+ * Given that the JWKS is referenced by the JWT, the JWKS must be made available by the
+ * OAuth/OIDC provider so that a JWT can be validated.
+ *
+ * @see CloseableVerificationKeyResolver
+ * @see VerificationKeyResolver
+ * @see RefreshingHttpsJwks
+ * @see HttpsJwks
+ */
+
+public class RefreshingHttpsJwksVerificationKeyResolver implements CloseableVerificationKeyResolver {
+
+    private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwksVerificationKeyResolver.class);
+
+    private final RefreshingHttpsJwks refreshingHttpsJwks;
+
+    private final VerificationJwkSelector verificationJwkSelector;
+
+    private boolean isInitialized;
+
+    public RefreshingHttpsJwksVerificationKeyResolver(RefreshingHttpsJwks refreshingHttpsJwks) {
+        this.refreshingHttpsJwks = refreshingHttpsJwks;
+        this.verificationJwkSelector = new VerificationJwkSelector();
+    }
+
+    @Override
+    public void init() throws IOException {
+        try {
+            log.debug("init started");
+
+            refreshingHttpsJwks.init();
+        } finally {
+            isInitialized = true;
+
+            log.debug("init completed");
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            log.debug("close started");
+
+            refreshingHttpsJwks.close();
+        } finally {
+            log.debug("close completed");
+        }
+    }
+
+    @Override
+    public Key resolveKey(JsonWebSignature jws, List<JsonWebStructure> nestingContext) throws UnresolvableKeyException {
+        if (!isInitialized)
+            throw new IllegalStateException("Please call init() first");
+
+        try {
+            List<JsonWebKey> jwks = refreshingHttpsJwks.getJsonWebKeys();
+            JsonWebKey jwk = verificationJwkSelector.select(jws, jwks);
+
+            if (jwk != null)
+                return jwk.getKey();
+
+            String keyId = jws.getKeyIdHeaderValue();
+
+            if (refreshingHttpsJwks.maybeExpediteRefresh(keyId))
+                log.debug("Refreshing JWKs from {} as no suitable verification key for JWS w/ header {} was found in {}", refreshingHttpsJwks.getLocation(), jws.getHeaders().getFullHeaderAsJsonString(), jwks);
+
+            StringBuilder sb = new StringBuilder();
+            sb.append("Unable to find a suitable verification key for JWS w/ header ").append(jws.getHeaders().getFullHeaderAsJsonString());
+            sb.append(" from JWKs ").append(jwks).append(" obtained from ").append(
+                refreshingHttpsJwks.getLocation());
+            throw new UnresolvableKeyException(sb.toString());
+        } catch (JoseException | IOException e) {
+            StringBuilder sb = new StringBuilder();
+            sb.append("Unable to find a suitable verification key for JWS w/ header ").append(jws.getHeaders().getFullHeaderAsJsonString());
+            sb.append(" due to an unexpected exception (").append(e).append(") while obtaining or using keys from JWKS endpoint at ").append(
+                refreshingHttpsJwks.getLocation());
+            throw new UnresolvableKeyException(sb.toString(), e);
+        }
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java
new file mode 100644
index 0000000..ffa5672
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retry encapsulates the mechanism to perform a retry and then exponential
+ * backoff using provided wait times between attempts.
+ *
+ * @param <R> Result type
+ */
+
+public class Retry<R> {
+
+    private static final Logger log = LoggerFactory.getLogger(Retry.class);
+
+    private final Time time;
+
+    private final long retryBackoffMs;
+
+    private final long retryBackoffMaxMs;
+
+    public Retry(long retryBackoffMs, long retryBackoffMaxMs) {
+        this(Time.SYSTEM, retryBackoffMs, retryBackoffMaxMs);
+    }
+
+    public Retry(Time time, long retryBackoffMs, long retryBackoffMaxMs) {
+        this.time = time;
+        this.retryBackoffMs = retryBackoffMs;
+        this.retryBackoffMaxMs = retryBackoffMaxMs;
+
+        if (this.retryBackoffMs < 0)
+            throw new IllegalArgumentException(String.format("retryBackoffMs value (%s) must be non-negative", retryBackoffMs));
+
+        if (this.retryBackoffMaxMs < 0)
+            throw new IllegalArgumentException(String.format("retryBackoffMaxMs value (%s) must be non-negative", retryBackoffMaxMs));
+
+        if (this.retryBackoffMaxMs < this.retryBackoffMs)
+            throw new IllegalArgumentException(String.format("retryBackoffMaxMs value (%s) is less than retryBackoffMs value (%s)", retryBackoffMaxMs, retryBackoffMs));
+    }
+
+    public R execute(Retryable<R> retryable) throws ExecutionException {
+        long endMs = time.milliseconds() + retryBackoffMaxMs;
+        int currAttempt = 0;
+        ExecutionException error = null;
+
+        while (time.milliseconds() <= endMs) {
+            currAttempt++;
+
+            try {
+                return retryable.call();
+            } catch (UnretryableException e) {
+                // We've deemed this error to not be worth retrying, so collect the error and
+                // fail immediately.
+                if (error == null)
+                    error = new ExecutionException(e);
+
+                break;
+            } catch (ExecutionException e) {
+                log.warn("Error during retry attempt {}", currAttempt, e);
+
+                if (error == null)
+                    error = e;
+
+                long waitMs = retryBackoffMs * (long) Math.pow(2, currAttempt - 1);
+                long diff = endMs - time.milliseconds();
+                waitMs = Math.min(waitMs, diff);
+
+                if (waitMs <= 0)
+                    break;
+
+                String message = String.format("Attempt %s to make call resulted in an error; sleeping %s ms before retrying",
+                    currAttempt, waitMs);
+                log.warn(message, e);
+
+                time.sleep(waitMs);
+            }
+        }
+
+        if (error == null)
+            // Really shouldn't ever get to here, but...
+            error = new ExecutionException(new IllegalStateException("Exhausted all retry attempts but no attempt returned value or encountered exception"));
+
+        throw error;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retryable.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retryable.java
new file mode 100644
index 0000000..67967ad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retryable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Simple interface to abstract out the call that is made so that it can be retried.
+ *
+ * @param <R> Result type
+ *
+ * @see Retry
+ * @see UnretryableException
+ */
+
+public interface Retryable<R> {
+
+    /**
+     * Perform the operation and return the data from the response.
+     *
+     * @return Return response data, formatted in the given data type
+     *
+     * @throws ExecutionException   Thrown on errors connecting, writing, reading, timeouts, etc.
+     *                              that can likely be tried again
+     * @throws UnretryableException Thrown on errors that we can determine should not be tried again
+     */
+
+    R call() throws ExecutionException, UnretryableException;
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/SerializedJwt.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/SerializedJwt.java
new file mode 100644
index 0000000..962d720
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/SerializedJwt.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+/**
+ * SerializedJwt provides a modicum of structure and validation around a JWT's serialized form by
+ * splitting and making the three sections (header, payload, and signature) available to the user.
+ */
+
+public class SerializedJwt {
+
+    private final String token;
+
+    private final String header;
+
+    private final String payload;
+
+    private final String signature;
+
+    public SerializedJwt(String token) {
+        if (token == null)
+            token = "";
+        else
+            token = token.trim();
+
+        if (token.isEmpty())
+            throw new ValidateException("Empty JWT provided; expected three sections (header, payload, and signature)");
+
+        String[] splits = token.split("\\.");
+
+        if (splits.length != 3)
+            throw new ValidateException(String.format("Malformed JWT provided (%s); expected three sections (header, payload, and signature), but %s sections provided",
+                token, splits.length));
+
+        this.token = token.trim();
+        this.header = validateSection(splits[0], "header");
+        this.payload = validateSection(splits[1], "payload");
+        this.signature = validateSection(splits[2], "signature");
+    }
+
+    /**
+     * Returns the entire base 64-encoded JWT.
+     *
+     * @return JWT
+     */
+
+    public String getToken() {
+        return token;
+    }
+
+    /**
+     * Returns the first section--the JWT header--in its base 64-encoded form.
+     *
+     * @return Header section of the JWT
+     */
+
+    public String getHeader() {
+        return header;
+    }
+
+    /**
+     * Returns the second section--the JWT payload--in its base 64-encoded form.
+     *
+     * @return Payload section of the JWT
+     */
+
+    public String getPayload() {
+        return payload;
+    }
+
+    /**
+     * Returns the third section--the JWT signature--in its base 64-encoded form.
+     *
+     * @return Signature section of the JWT
+     */
+
+    public String getSignature() {
+        return signature;
+    }
+
+    private String validateSection(String section, String sectionName) throws ValidateException {
+        section = section.trim();
+
+        if (section.isEmpty())
+            throw new ValidateException(String.format(
+                "Malformed JWT provided; expected at least three sections (header, payload, and signature), but %s section missing",
+                sectionName));
+
+        return section;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/UnretryableException.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/UnretryableException.java
new file mode 100644
index 0000000..1964cfb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/UnretryableException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import org.apache.kafka.common.KafkaException;
+
+public class UnretryableException extends KafkaException {
+
+    public UnretryableException(String message) {
+        super(message);
+    }
+
+    public UnretryableException(Throwable cause) {
+        super(cause);
+    }
+
+    public UnretryableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidateException.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidateException.java
new file mode 100644
index 0000000..2ebebeb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidateException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import javax.security.auth.callback.Callback;
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * ValidateException is thrown in cases where a JWT access token cannot be determined to be
+ * valid for one reason or another. It is intended to be used when errors arise within the
+ * processing of a {@link javax.security.auth.callback.CallbackHandler#handle(Callback[])}.
+ * This error, however, is not thrown from that method directly.
+ *
+ * @see AccessTokenValidator#validate(String)
+ */
+
+public class ValidateException extends KafkaException {
+
+    public ValidateException(String message) {
+        super(message);
+    }
+
+    public ValidateException(Throwable cause) {
+        super(cause);
+    }
+
+    public ValidateException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java
new file mode 100644
index 0000000..4d95635
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.jose4j.jwt.JwtClaims;
+import org.jose4j.jwt.MalformedClaimException;
+import org.jose4j.jwt.NumericDate;
+import org.jose4j.jwt.ReservedClaimNames;
+import org.jose4j.jwt.consumer.InvalidJwtException;
+import org.jose4j.jwt.consumer.JwtConsumer;
+import org.jose4j.jwt.consumer.JwtConsumerBuilder;
+import org.jose4j.jwt.consumer.JwtContext;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used
+ * by the broker to perform more extensive validation of the JWT access token that is received
+ * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's
+ * token endpoint.
+ *
+ * The validation steps performed (primary by the jose4j library) are:
+ *
+ * <ol>
+ *     <li>
+ *         Basic structural validation of the <code>b64token</code> value as defined in
+ *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a>
+ *     </li>
+ *     <li>Basic conversion of the token into an in-memory data structure</li>
+ *     <li>
+ *         Presence of scope, <code>exp</code>, subject, <code>iss</code>, and
+ *         <code>iat</code> claims
+ *     </li>
+ *     <li>
+ *         Signature matching validation against the <code>kid</code> and those provided by
+ *         the OAuth/OIDC provider's JWKS
+ *     </li>
+ * </ol>
+ */
+
+public class ValidatorAccessTokenValidator implements AccessTokenValidator {
+
+    private static final Logger log = LoggerFactory.getLogger(ValidatorAccessTokenValidator.class);
+
+    private final JwtConsumer jwtConsumer;
+
+    private final String scopeClaimName;
+
+    private final String subClaimName;
+
+    /**
+     * Creates a new ValidatorAccessTokenValidator that will be used by the broker for more
+     * thorough validation of the JWT.
+     *
+     * @param clockSkew               The optional value (in seconds) to allow for differences
+     *                                between the time of the OAuth/OIDC identity provider and
+     *                                the broker. If <code>null</code> is provided, the broker
+     *                                and the OAUth/OIDC identity provider are assumed to have
+     *                                very close clock settings.
+     * @param expectedAudiences       The (optional) set the broker will use to verify that
+     *                                the JWT was issued for one of the expected audiences.
+     *                                The JWT will be inspected for the standard OAuth
+     *                                <code>aud</code> claim and if this value is set, the
+     *                                broker will match the value from JWT's <code>aud</code>
+     *                                claim to see if there is an <b>exact</b> match. If there is no
+     *                                match, the broker will reject the JWT and authentication
+     *                                will fail. May be <code>null</code> to not perform any
+     *                                check to verify the JWT's <code>aud</code> claim matches any
+     *                                fixed set of known/expected audiences.
+     * @param expectedIssuer          The (optional) value for the broker to use to verify that
+     *                                the JWT was created by the expected issuer. The JWT will
+     *                                be inspected for the standard OAuth <code>iss</code> claim
+     *                                and if this value is set, the broker will match it
+     *                                <b>exactly</b> against what is in the JWT's <code>iss</code>
+     *                                claim. If there is no match, the broker will reject the JWT
+     *                                and authentication will fail. May be <code>null</code> to not
+     *                                perform any check to verify the JWT's <code>iss</code> claim
+     *                                matches a specific issuer.
+     * @param verificationKeyResolver jose4j-based {@link VerificationKeyResolver} that is used
+     *                                to validate the signature matches the contents of the header
+     *                                and payload
+     * @param scopeClaimName          Name of the scope claim to use; must be non-<code>null</code>
+     * @param subClaimName            Name of the subject claim to use; must be
+     *                                non-<code>null</code>
+     *
+     * @see JwtConsumerBuilder
+     * @see JwtConsumer
+     * @see VerificationKeyResolver
+     */
+
+    public ValidatorAccessTokenValidator(Integer clockSkew,
+        Set<String> expectedAudiences,
+        String expectedIssuer,
+        VerificationKeyResolver verificationKeyResolver,
+        String scopeClaimName,
+        String subClaimName) {
+        final JwtConsumerBuilder jwtConsumerBuilder = new JwtConsumerBuilder();
+
+        if (clockSkew != null)
+            jwtConsumerBuilder.setAllowedClockSkewInSeconds(clockSkew);
+
+        if (expectedAudiences != null && !expectedAudiences.isEmpty())
+            jwtConsumerBuilder.setExpectedAudience(expectedAudiences.toArray(new String[0]));
+
+        if (expectedIssuer != null)
+            jwtConsumerBuilder.setExpectedIssuer(expectedIssuer);
+
+        this.jwtConsumer = jwtConsumerBuilder
+            .setJwsAlgorithmConstraints(DISALLOW_NONE)
+            .setRequireExpirationTime()
+            .setRequireIssuedAt()
+            .setRequireSubject()
+            .setVerificationKeyResolver(verificationKeyResolver)
+            .build();
+        this.scopeClaimName = scopeClaimName;
+        this.subClaimName = subClaimName;
+    }
+
+    /**
+     * Accepts an OAuth JWT access token in base-64 encoded format, validates, and returns an
+     * OAuthBearerToken.
+     *
+     * @param accessToken Non-<code>null</code> JWT access token
+     * @return {@link OAuthBearerToken}
+     * @throws ValidateException Thrown on errors performing validation of given token
+     */
+
+    @SuppressWarnings("unchecked")
+    public OAuthBearerToken validate(String accessToken) throws ValidateException {
+        log.debug("validate - accessToken: {}", accessToken);
+        SerializedJwt serializedJwt = new SerializedJwt(accessToken);
+
+        JwtContext jwt;
+
+        try {
+            jwt = jwtConsumer.process(serializedJwt.getToken());
+        } catch (InvalidJwtException e) {
+            throw new ValidateException(String.format("Could not validate the access token: %s", e.getMessage()), e);
+        }
+
+        JwtClaims claims = jwt.getJwtClaims();
+
+        Object scopeRaw = getClaim(() -> claims.getClaimValue(scopeClaimName), scopeClaimName);
+        Collection<String> scopeRawCollection;
+
+        if (scopeRaw instanceof String)
+            scopeRawCollection = Collections.singletonList((String) scopeRaw);
+        else if (scopeRaw instanceof Collection)
+            scopeRawCollection = (Collection<String>) scopeRaw;
+        else
+            scopeRawCollection = Collections.emptySet();
+
+        NumericDate expirationRaw = getClaim(claims::getExpirationTime, ReservedClaimNames.EXPIRATION_TIME);
+        String subRaw = getClaim(() -> claims.getStringClaimValue(subClaimName), subClaimName);
+        NumericDate issuedAtRaw = getClaim(claims::getIssuedAt, ReservedClaimNames.ISSUED_AT);
+
+        Set<String> scopes = ClaimValidationUtils.validateScopes(scopeClaimName, scopeRawCollection);
+        long expiration = ClaimValidationUtils.validateExpiration(ReservedClaimNames.EXPIRATION_TIME,
+            expirationRaw != null ? expirationRaw.getValueInMillis() : null);
+        String sub = ClaimValidationUtils.validateSubject(subClaimName, subRaw);
+        Long issuedAt = ClaimValidationUtils.validateIssuedAt(ReservedClaimNames.ISSUED_AT,
+            issuedAtRaw != null ? issuedAtRaw.getValueInMillis() : null);
+
+        OAuthBearerToken token = new BasicOAuthBearerToken(accessToken,
+            scopes,
+            expiration,
+            sub,
+            issuedAt);
+
+        log.debug("validate - token: {}", token);
+
+        return token;
+    }
+
+    private <T> T getClaim(ClaimSupplier<T> supplier, String claimName) throws ValidateException {
+        try {
+            T value = supplier.get();
+            log.debug("getClaim - {}: {}", claimName, value);
+            return value;
+        } catch (MalformedClaimException e) {
+            throw new ValidateException(String.format("Could not extract the '%s' claim from the access token", claimName), e);
+        }
+    }
+
+    public interface ClaimSupplier<T> {
+
+        T get() throws MalformedClaimException;
+
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/VerificationKeyResolverFactory.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/VerificationKeyResolverFactory.java
new file mode 100644
index 0000000..b6ec46a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/VerificationKeyResolverFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.utils.Time;
+import org.jose4j.http.Get;
+import org.jose4j.jwk.HttpsJwks;
+
+public class VerificationKeyResolverFactory {
+
+    /**
+     * Create an {@link AccessTokenRetriever} from the given
+     * {@link org.apache.kafka.common.config.SaslConfigs}.
+     *
+     * <b>Note</b>: the returned <code>CloseableVerificationKeyResolver</code> is not
+     * initialized here and must be done by the caller.
+     *
+     * Primarily exposed here for unit testing.
+     *
+     * @param configs SASL configuration
+     *
+     * @return Non-<code>null</code> {@link CloseableVerificationKeyResolver}
+     */
+    public static CloseableVerificationKeyResolver create(Map<String, ?> configs,
+        Map<String, Object> jaasConfig) {
+        return create(configs, null, jaasConfig);
+    }
+
+    public static CloseableVerificationKeyResolver create(Map<String, ?> configs,
+        String saslMechanism,
+        Map<String, Object> jaasConfig) {
+        ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
+        URL jwksEndpointUrl = cu.validateUrl(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
+
+        if (jwksEndpointUrl.getProtocol().toLowerCase(Locale.ROOT).equals("file")) {
+            Path p = cu.validateFile(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
+            return new JwksFileVerificationKeyResolver(p);
+        } else {
+            long refreshIntervalMs = cu.validateLong(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, true, 0L);
+            JaasOptionsUtils jou = new JaasOptionsUtils(jaasConfig);
+            SSLSocketFactory sslSocketFactory = null;
+
+            if (jou.shouldCreateSSLSocketFactory(jwksEndpointUrl))
+                sslSocketFactory = jou.createSSLSocketFactory();
+
+            HttpsJwks httpsJwks = new HttpsJwks(jwksEndpointUrl.toString());
+            httpsJwks.setDefaultCacheDuration(refreshIntervalMs);
+
+            if (sslSocketFactory != null) {
+                Get get = new Get();
+                get.setSslSocketFactory(sslSocketFactory);
+                httpsJwks.setSimpleHttpGet(get);
+            }
+
+            RefreshingHttpsJwks refreshingHttpsJwks = new RefreshingHttpsJwks(Time.SYSTEM,
+                httpsJwks,
+                refreshIntervalMs,
+                cu.validateLong(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS),
+                cu.validateLong(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS));
+            return new RefreshingHttpsJwksVerificationKeyResolver(refreshingHttpsJwks);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenBuilder.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenBuilder.java
new file mode 100644
index 0000000..20def92
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenBuilder.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.jose4j.jwk.RsaJsonWebKey;
+import org.jose4j.jwk.RsaJwkGenerator;
+import org.jose4j.jws.AlgorithmIdentifiers;
+import org.jose4j.jws.JsonWebSignature;
+import org.jose4j.jwt.ReservedClaimNames;
+import org.jose4j.lang.JoseException;
+
+public class AccessTokenBuilder {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private String audience;
+
+    private String subject = "jdoe";
+
+    private String subjectClaimName = ReservedClaimNames.SUBJECT;
+
+    private Object scope = "engineering";
+
+    private String scopeClaimName = "scope";
+
+    private Long issuedAtSeconds;
+
+    private Long expirationSeconds;
+
+    private RsaJsonWebKey jwk;
+
+    public AccessTokenBuilder() throws JoseException {
+        this(new MockTime());
+    }
+
+    public AccessTokenBuilder(Time time) throws JoseException {
+        this.issuedAtSeconds = time.milliseconds() / 1000;
+        this.expirationSeconds = this.issuedAtSeconds + 60;
+        this.jwk = createJwk();
+    }
+
+    public static RsaJsonWebKey createJwk() throws JoseException {
+        RsaJsonWebKey jwk = RsaJwkGenerator.generateJwk(2048);
+        jwk.setKeyId("key-1");
+        return jwk;
+    }
+
+    public String audience() {
+        return audience;
+    }
+
+    public AccessTokenBuilder audience(String audience) {
+        this.audience = audience;
+        return this;
+    }
+
+    public String subject() {
+        return subject;
+    }
+
+    public AccessTokenBuilder subject(String subject) {
+        this.subject = subject;
+        return this;
+    }
+
+    public String subjectClaimName() {
+        return subjectClaimName;
+    }
+
+    public AccessTokenBuilder subjectClaimName(String subjectClaimName) {
+        this.subjectClaimName = subjectClaimName;
+        return this;
+    }
+
+    public Object scope() {
+        return scope;
+    }
+
+    public AccessTokenBuilder scope(Object scope) {
+        this.scope = scope;
+
+        if (scope instanceof String) {
+            return this;
+        } else if (scope instanceof Collection) {
+            return this;
+        } else {
+            throw new IllegalArgumentException(String.format("%s parameter must be a %s or a %s containing %s",
+                scopeClaimName,
+                String.class.getName(),
+                Collection.class.getName(),
+                String.class.getName()));
+        }
+    }
+
+    public String scopeClaimName() {
+        return scopeClaimName;
+    }
+
+    public AccessTokenBuilder scopeClaimName(String scopeClaimName) {
+        this.scopeClaimName = scopeClaimName;
+        return this;
+    }
+
+    public Long issuedAtSeconds() {
+        return issuedAtSeconds;
+    }
+
+    public AccessTokenBuilder issuedAtSeconds(Long issuedAtSeconds) {
+        this.issuedAtSeconds = issuedAtSeconds;
+        return this;
+    }
+
+    public Long expirationSeconds() {
+        return expirationSeconds;
+    }
+
+    public AccessTokenBuilder expirationSeconds(Long expirationSeconds) {
+        this.expirationSeconds = expirationSeconds;
+        return this;
+    }
+
+    public RsaJsonWebKey jwk() {
+        return jwk;
+    }
+
+    public AccessTokenBuilder jwk(RsaJsonWebKey jwk) {
+        this.jwk = jwk;
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public String build() throws JoseException, IOException {
+        ObjectNode node = objectMapper.createObjectNode();
+
+        if (audience != null)
+            node.put(ReservedClaimNames.AUDIENCE, audience);
+
+        if (subject != null)
+            node.put(subjectClaimName, subject);
+
+        if (scope instanceof String) {
+            node.put(scopeClaimName, (String) scope);
+        } else if (scope instanceof Collection) {
+            ArrayNode child = node.putArray(scopeClaimName);
+            ((Collection<String>) scope).forEach(child::add);
+        } else {
+            throw new IllegalArgumentException(String.format("%s claim must be a %s or a %s containing %s",
+                scopeClaimName,
+                String.class.getName(),
+                Collection.class.getName(),
+                String.class.getName()));
+        }
+
+        if (issuedAtSeconds != null)
+            node.put(ReservedClaimNames.ISSUED_AT, issuedAtSeconds);
+
+        if (expirationSeconds != null)
+            node.put(ReservedClaimNames.EXPIRATION_TIME, expirationSeconds);
+
+        String json = objectMapper.writeValueAsString(node);
+
+        JsonWebSignature jws = new JsonWebSignature();
+        jws.setPayload(json);
+        jws.setKey(jwk.getPrivateKey());
+        jws.setKeyIdHeaderValue(jwk.getKeyId());
+        jws.setAlgorithmHeaderValue(AlgorithmIdentifiers.RSA_USING_SHA256);
+        return jws.getCompactSerialization();
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactoryTest.java
new file mode 100644
index 0000000..5195315
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactoryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+public class AccessTokenRetrieverFactoryTest extends OAuthBearerTest {
+
+    @Test
+    public void testConfigureRefreshingFileAccessTokenRetriever() throws Exception {
+        String expected = "{}";
+
+        File tmpDir = createTempDir("access-token");
+        File accessTokenFile = createTempFile(tmpDir, "access-token-", ".json", expected);
+
+        Map<String, ?> configs = Collections.singletonMap(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, accessTokenFile.toURI().toString());
+        Map<String, Object> jaasConfig = Collections.emptyMap();
+
+        try (AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, jaasConfig)) {
+            accessTokenRetriever.init();
+            assertEquals(expected, accessTokenRetriever.retrieve());
+        }
+    }
+
+    @Test
+    public void testConfigureRefreshingFileAccessTokenRetrieverWithInvalidDirectory() {
+        // Should fail because the parent path doesn't exist.
+        Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, new File("/tmp/this-directory-does-not-exist/foo.json").toURI().toString());
+        Map<String, Object> jaasConfig = Collections.emptyMap();
+        assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
+    }
+
+    @Test
+    public void testConfigureRefreshingFileAccessTokenRetrieverWithInvalidFile() throws Exception {
+        // Should fail because the while the parent path exists, the file itself doesn't.
+        File tmpDir = createTempDir("this-directory-does-exist");
+        File accessTokenFile = new File(tmpDir, "this-file-does-not-exist.json");
+        Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, accessTokenFile.toURI().toString());
+        Map<String, Object> jaasConfig = Collections.emptyMap();
+        assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java
new file mode 100644
index 0000000..1270674
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+public class AccessTokenValidatorFactoryTest extends OAuthBearerTest {
+
+    @Test
+    public void testConfigureThrowsExceptionOnAccessTokenValidatorInit() {
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() {
+            @Override
+            public void init() throws IOException {
+                throw new IOException("My init had an error!");
+            }
+            @Override
+            public String retrieve() {
+                return "dummy";
+            }
+        };
+
+        Map<String, ?> configs = getSaslConfigs();
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs);
+
+        assertThrowsWithMessage(
+            KafkaException.class, () -> handler.init(accessTokenRetriever, accessTokenValidator), "encountered an error when initializing");
+    }
+
+    @Test
+    public void testConfigureThrowsExceptionOnAccessTokenValidatorClose() {
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() {
+            @Override
+            public void close() throws IOException {
+                throw new IOException("My close had an error!");
+            }
+            @Override
+            public String retrieve() {
+                return "dummy";
+            }
+        };
+
+        Map<String, ?> configs = getSaslConfigs();
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs);
+        handler.init(accessTokenRetriever, accessTokenValidator);
+
+        // Basically asserting this doesn't throw an exception :(
+        handler.close();
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorTest.java
new file mode 100644
index 0000000..8407ac3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.jose4j.jws.AlgorithmIdentifiers;
+import org.jose4j.jwx.HeaderParameterNames;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public abstract class AccessTokenValidatorTest extends OAuthBearerTest {
+
+    protected abstract AccessTokenValidator createAccessTokenValidator(AccessTokenBuilder accessTokenBuilder) throws Exception;
+
+    protected AccessTokenValidator createAccessTokenValidator() throws Exception {
+        AccessTokenBuilder builder = new AccessTokenBuilder();
+        return createAccessTokenValidator(builder);
+    }
+
+    @Test
+    public void testNull() throws Exception {
+        AccessTokenValidator validator = createAccessTokenValidator();
+        assertThrowsWithMessage(ValidateException.class, () -> validator.validate(null), "Empty JWT provided");
+    }
+
+    @Test
+    public void testEmptyString() throws Exception {
+        AccessTokenValidator validator = createAccessTokenValidator();
+        assertThrowsWithMessage(ValidateException.class, () -> validator.validate(""), "Empty JWT provided");
+    }
+
+    @Test
+    public void testWhitespace() throws Exception {
+        AccessTokenValidator validator = createAccessTokenValidator();
+        assertThrowsWithMessage(ValidateException.class, () -> validator.validate("    "), "Empty JWT provided");
+    }
+
+    @Test
+    public void testEmptySections() throws Exception {
+        AccessTokenValidator validator = createAccessTokenValidator();
+        assertThrowsWithMessage(ValidateException.class, () -> validator.validate(".."), "Malformed JWT provided");
+    }
+
+    @Test
+    public void testMissingHeader() throws Exception {
+        AccessTokenValidator validator = createAccessTokenValidator();
+        String header = "";
+        String payload = createBase64JsonJwtSection(node -> { });
+        String signature = "";
+        String accessToken = String.format("%s.%s.%s", header, payload, signature);
+        assertThrows(ValidateException.class, () -> validator.validate(accessToken));
+    }
+
+    @Test
+    public void testMissingPayload() throws Exception {
+        AccessTokenValidator validator = createAccessTokenValidator();
+        String header = createBase64JsonJwtSection(node -> node.put(HeaderParameterNames.ALGORITHM, AlgorithmIdentifiers.NONE));
+        String payload = "";
+        String signature = "";
+        String accessToken = String.format("%s.%s.%s", header, payload, signature);
+        assertThrows(ValidateException.class, () -> validator.validate(accessToken));
+    }
+
+    @Test
+    public void testMissingSignature() throws Exception {
+        AccessTokenValidator validator = createAccessTokenValidator();
+        String header = createBase64JsonJwtSection(node -> node.put(HeaderParameterNames.ALGORITHM, AlgorithmIdentifiers.NONE));
+        String payload = createBase64JsonJwtSection(node -> { });
+        String signature = "";
+        String accessToken = String.format("%s.%s.%s", header, payload, signature);
+        assertThrows(ValidateException.class, () -> validator.validate(accessToken));
+    }
+
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/BasicOAuthBearerTokenTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/BasicOAuthBearerTokenTest.java
new file mode 100644
index 0000000..658d07f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/BasicOAuthBearerTokenTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.junit.jupiter.api.Test;
+
+public class BasicOAuthBearerTokenTest {
+
+    @Test
+    public void basic() {
+        OAuthBearerToken token = new BasicOAuthBearerToken("not.valid.token",
+            Collections.emptySet(),
+            0L,
+            "jdoe",
+            0L);
+        assertEquals("not.valid.token", token.value());
+        assertTrue(token.scope().isEmpty());
+        assertEquals(0L, token.lifetimeMs());
+        assertEquals("jdoe", token.principalName());
+        assertEquals(0L, token.startTimeMs());
+    }
+
+    @Test
+    public void negativeLifetime() {
+        OAuthBearerToken token = new BasicOAuthBearerToken("not.valid.token",
+            Collections.emptySet(),
+            -1L,
+            "jdoe",
+            0L);
+        assertEquals("not.valid.token", token.value());
+        assertTrue(token.scope().isEmpty());
+        assertEquals(-1L, token.lifetimeMs());
+        assertEquals("jdoe", token.principalName());
+        assertEquals(0L, token.startTimeMs());
+    }
+
+    @Test
+    public void noErrorIfModifyScope() {
+        // Start with a basic set created by the caller.
+        SortedSet<String> callerSet = new TreeSet<>(Arrays.asList("a", "b", "c"));
+        OAuthBearerToken token = new BasicOAuthBearerToken("not.valid.token",
+            callerSet,
+            0L,
+            "jdoe",
+            0L);
+
+        // Make sure it all looks good
+        assertNotNull(token.scope());
+        assertEquals(3, token.scope().size());
+
+        // Add a value to the caller's set and note that it changes the token's scope set.
+        // Make sure to make it read-only when it's passed in.
+        callerSet.add("d");
+        assertTrue(token.scope().contains("d"));
+
+        // Similarly, removing a value from the caller's will affect the token's scope set.
+        // Make sure to make it read-only when it's passed in.
+        callerSet.remove("c");
+        assertFalse(token.scope().contains("c"));
+
+        // Ensure that attempting to change the token's scope set directly will not throw any error.
+        token.scope().clear();
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ClaimValidationUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ClaimValidationUtilsTest.java
new file mode 100644
index 0000000..0aeb6f7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ClaimValidationUtilsTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.junit.jupiter.api.Test;
+
+public class ClaimValidationUtilsTest extends OAuthBearerTest {
+
+    @Test
+    public void testValidateScopes() {
+        Set<String> scopes = ClaimValidationUtils.validateScopes("scope", Arrays.asList("  a  ", "    b    "));
+
+        assertEquals(2, scopes.size());
+        assertTrue(scopes.contains("a"));
+        assertTrue(scopes.contains("b"));
+    }
+
+    @Test
+    public void testValidateScopesDisallowsDuplicates() {
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateScopes("scope", Arrays.asList("a", "b", "a")));
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateScopes("scope", Arrays.asList("a", "b", "  a  ")));
+    }
+
+    @Test
+    public void testValidateScopesDisallowsEmptyNullAndWhitespace() {
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateScopes("scope", Arrays.asList("a", "")));
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateScopes("scope", Arrays.asList("a", null)));
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateScopes("scope", Arrays.asList("a", "  ")));
+    }
+
+    @Test
+    public void testValidateScopesResultIsImmutable() {
+        SortedSet<String> callerSet = new TreeSet<>(Arrays.asList("a", "b", "c"));
+        Set<String> scopes = ClaimValidationUtils.validateScopes("scope", callerSet);
+
+        assertEquals(3, scopes.size());
+
+        callerSet.add("d");
+        assertEquals(4, callerSet.size());
+        assertTrue(callerSet.contains("d"));
+        assertEquals(3, scopes.size());
+        assertFalse(scopes.contains("d"));
+
+        callerSet.remove("c");
+        assertEquals(3, callerSet.size());
+        assertFalse(callerSet.contains("c"));
+        assertEquals(3, scopes.size());
+        assertTrue(scopes.contains("c"));
+
+        callerSet.clear();
+        assertEquals(0, callerSet.size());
+        assertEquals(3, scopes.size());
+    }
+
+    @Test
+    public void testValidateScopesResultThrowsExceptionOnMutation() {
+        SortedSet<String> callerSet = new TreeSet<>(Arrays.asList("a", "b", "c"));
+        Set<String> scopes = ClaimValidationUtils.validateScopes("scope", callerSet);
+        assertThrows(UnsupportedOperationException.class, scopes::clear);
+    }
+
+    @Test
+    public void testValidateExpiration() {
+        Long expected = 1L;
+        Long actual = ClaimValidationUtils.validateExpiration("exp", expected);
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testValidateExpirationAllowsZero() {
+        Long expected = 0L;
+        Long actual = ClaimValidationUtils.validateExpiration("exp", expected);
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testValidateExpirationDisallowsNull() {
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateExpiration("exp", null));
+    }
+
+    @Test
+    public void testValidateExpirationDisallowsNegatives() {
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateExpiration("exp", -1L));
+    }
+
+    @Test
+    public void testValidateSubject() {
+        String expected = "jdoe";
+        String actual = ClaimValidationUtils.validateSubject("sub", expected);
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testValidateSubjectDisallowsEmptyNullAndWhitespace() {
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateSubject("sub", ""));
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateSubject("sub", null));
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateSubject("sub", "  "));
+    }
+
+    @Test
+    public void testValidateClaimNameOverride() {
+        String expected = "email";
+        String actual = ClaimValidationUtils.validateClaimNameOverride("sub", String.format("  %s  ", expected));
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testValidateClaimNameOverrideDisallowsEmptyNullAndWhitespace() {
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateSubject("sub", ""));
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateSubject("sub", null));
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateSubject("sub", "  "));
+    }
+
+    @Test
+    public void testValidateIssuedAt() {
+        Long expected = 1L;
+        Long actual = ClaimValidationUtils.validateIssuedAt("iat", expected);
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testValidateIssuedAtAllowsZero() {
+        Long expected = 0L;
+        Long actual = ClaimValidationUtils.validateIssuedAt("iat", expected);
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testValidateIssuedAtAllowsNull() {
+        Long expected = null;
+        Long actual = ClaimValidationUtils.validateIssuedAt("iat", expected);
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testValidateIssuedAtDisallowsNegatives() {
+        assertThrows(ValidateException.class, () -> ClaimValidationUtils.validateIssuedAt("iat", -1L));
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtilsTest.java
new file mode 100644
index 0000000..783579a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtilsTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+public class ConfigurationUtilsTest extends OAuthBearerTest {
+
+    private final static String URL_CONFIG_NAME = "url";
+
+    @Test
+    public void testUrl() {
+        testUrl("http://www.example.com");
+    }
+
+    @Test
+    public void testUrlWithSuperfluousWhitespace() {
+        testUrl(String.format("  %s  ", "http://www.example.com"));
+    }
+
+    @Test
+    public void testUrlCaseInsensitivity() {
+        testUrl("HTTPS://WWW.EXAMPLE.COM");
+    }
+
+    @Test
+    public void testUrlFile() {
+        testUrl("file:///tmp/foo.txt");
+    }
+
+    @Test
+    public void testUrlFullPath() {
+        testUrl("https://myidp.example.com/oauth2/default/v1/token");
+    }
+
+    @Test
+    public void testUrlMissingProtocol() {
+        assertThrowsWithMessage(ConfigException.class, () -> testUrl("www.example.com"), "no protocol");
+    }
+
+    @Test
+    public void testUrlInvalidProtocol() {
+        assertThrowsWithMessage(ConfigException.class, () -> testUrl("ftp://ftp.example.com"), "invalid protocol");
+    }
+
+    @Test
+    public void testUrlNull() {
+        assertThrowsWithMessage(ConfigException.class, () -> testUrl(null), "must be non-null");
+    }
+
+    @Test
+    public void testUrlEmptyString() {
+        assertThrowsWithMessage(ConfigException.class, () -> testUrl(""), "must not contain only whitespace");
+    }
+
+    @Test
+    public void testUrlWhitespace() {
+        assertThrowsWithMessage(ConfigException.class, () -> testUrl("    "), "must not contain only whitespace");
+    }
+
+    private void testUrl(String value) {
+        Map<String, Object> configs = Collections.singletonMap(URL_CONFIG_NAME, value);
+        ConfigurationUtils cu = new ConfigurationUtils(configs);
+        cu.validateUrl(URL_CONFIG_NAME);
+    }
+
+    @Test
+    public void testFile() throws IOException {
+        File file = TestUtils.tempFile("some contents!");
+        testFile(file.toURI().toURL().toString());
+    }
+
+    @Test
+    public void testFileWithSuperfluousWhitespace() throws IOException {
+        File file = TestUtils.tempFile();
+        testFile(String.format("  %s  ", file.toURI().toURL()));
+    }
+
+    @Test
+    public void testFileDoesNotExist() {
+        assertThrowsWithMessage(ConfigException.class, () -> testFile(new File("/tmp/not/a/real/file.txt").toURI().toURL().toString()), "that doesn't exist");
+    }
+
+    @Test
+    public void testFileUnreadable() throws IOException {
+        File file = TestUtils.tempFile();
+
+        if (!file.setReadable(false))
+            throw new IllegalStateException(String.format("Can't test file permissions as test couldn't programmatically make temp file %s un-readable", file.getAbsolutePath()));
+
+        assertThrowsWithMessage(ConfigException.class, () -> testFile(file.toURI().toURL().toString()), "that doesn't have read permission");
+    }
+
+    @Test
+    public void testFileNull() {
+        assertThrowsWithMessage(ConfigException.class, () -> testFile(null), "must be non-null");
+    }
+
+    @Test
+    public void testFileEmptyString() {
+        assertThrowsWithMessage(ConfigException.class, () -> testFile(""), "must not contain only whitespace");
+    }
+
+    @Test
+    public void testFileWhitespace() {
+        assertThrowsWithMessage(ConfigException.class, () -> testFile("    "), "must not contain only whitespace");
+    }
+
+    protected void testFile(String value) {
+        Map<String, Object> configs = Collections.singletonMap(URL_CONFIG_NAME, value);
+        ConfigurationUtils cu = new ConfigurationUtils(configs);
+        cu.validateFile(URL_CONFIG_NAME);
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetrieverTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetrieverTest.java
new file mode 100644
index 0000000..de3b463
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetrieverTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.util.Base64;
+import java.util.Random;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+public class HttpAccessTokenRetrieverTest extends OAuthBearerTest {
+
+    @Test
+    public void test() throws IOException {
+        String expectedResponse = "Hiya, buddy";
+        HttpURLConnection mockedCon = createHttpURLConnection(expectedResponse);
+        String response = HttpAccessTokenRetriever.post(mockedCon, null, null, null, null);
+        assertEquals(expectedResponse, response);
+    }
+
+    @Test
+    public void testEmptyResponse() throws IOException {
+        HttpURLConnection mockedCon = createHttpURLConnection("");
+        assertThrows(IOException.class, () -> HttpAccessTokenRetriever.post(mockedCon, null, null, null, null));
+    }
+
+    @Test
+    public void testErrorReadingResponse() throws IOException {
+        HttpURLConnection mockedCon = createHttpURLConnection("dummy");
+        when(mockedCon.getInputStream()).thenThrow(new IOException("Can't read"));
+
+        assertThrows(IOException.class, () -> HttpAccessTokenRetriever.post(mockedCon, null, null, null, null));
+    }
+
+    @Test
+    public void testCopy() throws IOException {
+        byte[] expected = new byte[4096 + 1];
+        Random r = new Random();
+        r.nextBytes(expected);
+        InputStream in = new ByteArrayInputStream(expected);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        HttpAccessTokenRetriever.copy(in, out);
+        assertArrayEquals(expected, out.toByteArray());
+    }
+
+    @Test
+    public void testCopyError() throws IOException {
+        InputStream mockedIn = mock(InputStream.class);
+        OutputStream out = new ByteArrayOutputStream();
+        when(mockedIn.read(any(byte[].class))).thenThrow(new IOException());
+        assertThrows(IOException.class, () -> HttpAccessTokenRetriever.copy(mockedIn, out));
+    }
+
+    @Test
+    public void testParseAccessToken() throws IOException {
+        String expected = "abc";
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode node = mapper.createObjectNode();
+        node.put("access_token", expected);
+
+        String actual = HttpAccessTokenRetriever.parseAccessToken(mapper.writeValueAsString(node));
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testParseAccessTokenEmptyAccessToken() {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode node = mapper.createObjectNode();
+        node.put("access_token", "");
+
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.parseAccessToken(mapper.writeValueAsString(node)));
+    }
+
+    @Test
+    public void testParseAccessTokenMissingAccessToken() {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode node = mapper.createObjectNode();
+        node.put("sub", "jdoe");
+
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.parseAccessToken(mapper.writeValueAsString(node)));
+    }
+
+    @Test
+    public void testParseAccessTokenInvalidJson() {
+        assertThrows(IOException.class, () -> HttpAccessTokenRetriever.parseAccessToken("not valid JSON"));
+    }
+
+    @Test
+    public void testFormatAuthorizationHeader() throws IOException {
+        String expected = "Basic " + Base64.getUrlEncoder().encodeToString(Utils.utf8("id:secret"));
+
+        String actual = HttpAccessTokenRetriever.formatAuthorizationHeader("id", "secret");
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFormatAuthorizationHeaderMissingValues() {
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader(null, "secret"));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("id", null));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader(null, null));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("", "secret"));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("id", ""));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("", ""));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("  ", "secret"));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("id", "  "));
+        assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("  ", "  "));
+    }
+
+    @Test
+    public void testFormatRequestBody() throws IOException {
+        String expected = "grant_type=client_credentials&scope=scope";
+        String actual = HttpAccessTokenRetriever.formatRequestBody("scope");
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFormatRequestBodyWithEscaped() throws IOException {
+        String questionMark = "%3F";
+        String exclamationMark = "%21";
+
+        String expected = String.format("grant_type=client_credentials&scope=earth+is+great%s", exclamationMark);
+        String actual = HttpAccessTokenRetriever.formatRequestBody("earth is great!");
+        assertEquals(expected, actual);
+
+        expected = String.format("grant_type=client_credentials&scope=what+on+earth%s%s%s%s%s", questionMark, exclamationMark, questionMark, exclamationMark, questionMark);
+        actual = HttpAccessTokenRetriever.formatRequestBody("what on earth?!?!?");
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFormatRequestBodyMissingValues() throws IOException {
+        String expected = "grant_type=client_credentials";
+        String actual = HttpAccessTokenRetriever.formatRequestBody(null);
+        assertEquals(expected, actual);
+
+        actual = HttpAccessTokenRetriever.formatRequestBody("");
+        assertEquals(expected, actual);
+
+        actual = HttpAccessTokenRetriever.formatRequestBody("  ");
+        assertEquals(expected, actual);
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/JaasOptionsUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/JaasOptionsUtilsTest.java
new file mode 100644
index 0000000..2b32408
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/JaasOptionsUtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.junit.jupiter.api.Test;
+
+public class JaasOptionsUtilsTest extends OAuthBearerTest {
+
+    @Test
+    public void testSSLClientConfig() {
+        Map<String, Object> options = new HashMap<>();
+        String sslKeystore = "test.keystore.jks";
+        String sslTruststore = "test.truststore.jks";
+
+        options.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystore);
+        options.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "$3cr3+");
+        options.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststore);
+
+        JaasOptionsUtils jou = new JaasOptionsUtils(options);
+        Map<String, ?> sslClientConfig = jou.getSslClientConfig();
+        assertNotNull(sslClientConfig);
+        assertEquals(sslKeystore, sslClientConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+        assertEquals(sslTruststore, sslClientConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+        assertEquals(SslConfigs.DEFAULT_SSL_PROTOCOL, sslClientConfig.get(SslConfigs.SSL_PROTOCOL_CONFIG));
+    }
+
+    @Test
+    public void testShouldUseSslClientConfig() throws Exception {
+        JaasOptionsUtils jou = new JaasOptionsUtils(Collections.emptyMap());
+        assertFalse(jou.shouldCreateSSLSocketFactory(new URL("http://example.com")));
+        assertTrue(jou.shouldCreateSSLSocketFactory(new URL("https://example.com")));
+        assertFalse(jou.shouldCreateSSLSocketFactory(new URL("file:///tmp/test.txt")));
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidatorTest.java
new file mode 100644
index 0000000..6fd23f6
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidatorTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+public class LoginAccessTokenValidatorTest extends AccessTokenValidatorTest {
+
+    @Override
+    protected AccessTokenValidator createAccessTokenValidator(AccessTokenBuilder builder) {
+        return new LoginAccessTokenValidator(builder.scopeClaimName(), builder.subjectClaimName());
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandlerTest.java
new file mode 100644
index 0000000..4be823e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandlerTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+public class OAuthBearerLoginCallbackHandlerTest extends OAuthBearerTest {
+
+    @Test
+    public void testHandleTokenCallback() throws Exception {
+        Map<String, ?> configs = getSaslConfigs();
+        AccessTokenBuilder builder = new AccessTokenBuilder();
+        String accessToken = builder.build();
+        AccessTokenRetriever accessTokenRetriever = () -> accessToken;
+
+        OAuthBearerLoginCallbackHandler handler = createHandler(accessTokenRetriever, configs);
+
+        try {
+            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+            handler.handle(new Callback[] {callback});
+
+            assertNotNull(callback.token());
+            OAuthBearerToken token = callback.token();
+            assertEquals(accessToken, token.value());
+            assertEquals(builder.subject(), token.principalName());
+            assertEquals(builder.expirationSeconds() * 1000, token.lifetimeMs());
+            assertEquals(builder.issuedAtSeconds() * 1000, token.startTimeMs());
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testHandleSaslExtensionsCallback() throws Exception {
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com");
+        Map<String, Object> jaasConfig = new HashMap<>();
+        jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
+        jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
+        jaasConfig.put("extension_foo", "1");
+        jaasConfig.put("extension_bar", 2);
+        jaasConfig.put("EXTENSION_baz", "3");
+        configureHandler(handler, configs, jaasConfig);
+
+        try {
+            SaslExtensionsCallback callback = new SaslExtensionsCallback();
+            handler.handle(new Callback[]{callback});
+
+            assertNotNull(callback.extensions());
+            Map<String, String> extensions = callback.extensions().map();
+            assertEquals("1", extensions.get("foo"));
+            assertEquals("2", extensions.get("bar"));
+            assertNull(extensions.get("baz"));
+            assertEquals(2, extensions.size());
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testHandleSaslExtensionsCallbackWithInvalidExtension() {
+        String illegalKey = "extension_" + OAuthBearerClientInitialResponse.AUTH_KEY;
+
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com");
+        Map<String, Object> jaasConfig = new HashMap<>();
+        jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
+        jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
+        jaasConfig.put(illegalKey, "this key isn't allowed per OAuthBearerClientInitialResponse.validateExtensions");
+        configureHandler(handler, configs, jaasConfig);
+
+        try {
+            SaslExtensionsCallback callback = new SaslExtensionsCallback();
+            assertThrowsWithMessage(ConfigException.class,
+                () -> handler.handle(new Callback[]{callback}),
+                "Extension name " + OAuthBearerClientInitialResponse.AUTH_KEY + " is invalid");
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testInvalidCallbackGeneratesUnsupportedCallbackException() {
+        Map<String, ?> configs = getSaslConfigs();
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        AccessTokenRetriever accessTokenRetriever = () -> "foo";
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs);
+        handler.init(accessTokenRetriever, accessTokenValidator);
+
+        try {
+            Callback unsupportedCallback = new Callback() { };
+            assertThrows(UnsupportedCallbackException.class, () -> handler.handle(new Callback[]{unsupportedCallback}));
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testInvalidAccessToken() throws Exception {
+        testInvalidAccessToken("this isn't valid", "Malformed JWT provided");
+        testInvalidAccessToken("this.isn't.valid", "malformed Base64 URL encoded value");
+        testInvalidAccessToken(createAccessKey("this", "isn't", "valid"), "malformed JSON");
+        testInvalidAccessToken(createAccessKey("{}", "{}", "{}"), "exp value must be non-null");
+    }
+
+    @Test
+    public void testMissingAccessToken() {
+        AccessTokenRetriever accessTokenRetriever = () -> {
+            throw new IOException("The token endpoint response access_token value must be non-null");
+        };
+        Map<String, ?> configs = getSaslConfigs();
+        OAuthBearerLoginCallbackHandler handler = createHandler(accessTokenRetriever, configs);
+
+        try {
+            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+            assertThrowsWithMessage(IOException.class,
+                () -> handler.handle(new Callback[]{callback}),
+                "token endpoint response access_token value must be non-null");
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testNotConfigured() {
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        assertThrowsWithMessage(IllegalStateException.class, () -> handler.handle(new Callback[] {}), "first call the configure or init method");
+    }
+
+    @Test
+    public void testConfigureWithAccessTokenFile() throws Exception {
+        String expected = "{}";
+
+        File tmpDir = createTempDir("access-token");
+        File accessTokenFile = createTempFile(tmpDir, "access-token-", ".json", expected);
+
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, accessTokenFile.toURI().toString());
+        Map<String, Object> jaasConfigs = Collections.emptyMap();
+        configureHandler(handler, configs, jaasConfigs);
+        assertTrue(handler.getAccessTokenRetriever() instanceof FileTokenRetriever);
+    }
+
+    @Test
+    public void testConfigureWithAccessClientCredentials() {
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com");
+        Map<String, Object> jaasConfigs = new HashMap<>();
+        jaasConfigs.put(CLIENT_ID_CONFIG, "an ID");
+        jaasConfigs.put(CLIENT_SECRET_CONFIG, "a secret");
+        configureHandler(handler, configs, jaasConfigs);
+        assertTrue(handler.getAccessTokenRetriever() instanceof HttpAccessTokenRetriever);
+    }
+
+    private void testInvalidAccessToken(String accessToken, String expectedMessageSubstring) throws Exception {
+        Map<String, ?> configs = getSaslConfigs();
+        OAuthBearerLoginCallbackHandler handler = createHandler(() -> accessToken, configs);
+
+        try {
+            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+            handler.handle(new Callback[]{callback});
+
+            assertNull(callback.token());
+            String actualMessage = callback.errorDescription();
+            assertNotNull(actualMessage);
+            assertTrue(actualMessage.contains(expectedMessageSubstring), String.format(
+                "The error message \"%s\" didn't contain the expected substring \"%s\"",
+                actualMessage, expectedMessageSubstring));
+        } finally {
+            handler.close();
+        }
+    }
+
+    private String createAccessKey(String header, String payload, String signature) {
+        Base64.Encoder enc = Base64.getEncoder();
+        header = enc.encodeToString(Utils.utf8(header));
+        payload = enc.encodeToString(Utils.utf8(payload));
+        signature = enc.encodeToString(Utils.utf8(signature));
+        return String.format("%s.%s.%s", header, payload, signature);
+    }
+
+    private OAuthBearerLoginCallbackHandler createHandler(AccessTokenRetriever accessTokenRetriever, Map<String, ?> configs) {
+        OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler();
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs);
+        handler.init(accessTokenRetriever, accessTokenValidator);
+        return handler;
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerTest.java
new file mode 100644
index 0000000..6fec08d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.TestJaasConfig;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.function.Executable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public abstract class OAuthBearerTest {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected ObjectMapper mapper = new ObjectMapper();
+
+    protected void assertThrowsWithMessage(Class<? extends Exception> clazz,
+        Executable executable,
+        String substring) {
+        boolean failed = false;
+
+        try {
+            executable.execute();
+        } catch (Throwable t) {
+            failed = true;
+            assertTrue(clazz.isInstance(t), String.format("Test failed by exception %s, but expected %s", t.getClass(), clazz));
+
+            assertErrorMessageContains(t.getMessage(), substring);
+        }
+
+        if (!failed)
+            fail("Expected test to fail with " + clazz + " that contains the string " + substring);
+    }
+
+    protected void assertErrorMessageContains(String actual, String expectedSubstring) {
+        assertTrue(actual.contains(expectedSubstring),
+            String.format("Expected exception message (\"%s\") to contain substring (\"%s\")",
+                actual,
+                expectedSubstring));
+    }
+
+    protected void configureHandler(AuthenticateCallbackHandler handler,
+        Map<String, ?> configs,
+        Map<String, Object> jaasConfig) {
+        TestJaasConfig config = new TestJaasConfig();
+        config.createOrUpdateEntry("KafkaClient", OAuthBearerLoginModule.class.getName(), jaasConfig);
+        AppConfigurationEntry kafkaClient = config.getAppConfigurationEntry("KafkaClient")[0];
+
+        handler.configure(configs,
+            OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+            Collections.singletonList(kafkaClient));
+    }
+
+    protected String createBase64JsonJwtSection(Consumer<ObjectNode> c) {
+        String json = createJsonJwtSection(c);
+
+        try {
+            return Utils.utf8(Base64.getEncoder().encode(Utils.utf8(json)));
+        } catch (Throwable t) {
+            fail(t);
+
+            // Shouldn't get to here...
+            return null;
+        }
+    }
+
+    protected String createJsonJwtSection(Consumer<ObjectNode> c) {
+        ObjectNode node = mapper.createObjectNode();
+        c.accept(node);
+
+        try {
+            return mapper.writeValueAsString(node);
+        } catch (Throwable t) {
+            fail(t);
+
+            // Shouldn't get to here...
+            return null;
+        }
+    }
+
+    protected Retryable<String> createRetryable(Exception[] attempts) {
+        Iterator<Exception> i = Arrays.asList(attempts).iterator();
+
+        return () -> {
+            Exception e = i.hasNext() ? i.next() : null;
+
+            if (e == null) {
+                return "success!";
+            } else {
+                if (e instanceof IOException)
+                    throw new ExecutionException(e);
+                else if (e instanceof RuntimeException)
+                    throw (RuntimeException) e;
+                else
+                    throw new RuntimeException(e);
+            }
+        };
+    }
+
+    protected HttpURLConnection createHttpURLConnection(String response) throws IOException {
+        HttpURLConnection mockedCon = mock(HttpURLConnection.class);
+        when(mockedCon.getURL()).thenReturn(new URL("https://www.example.com"));
+        when(mockedCon.getResponseCode()).thenReturn(200);
+        when(mockedCon.getOutputStream()).thenReturn(new ByteArrayOutputStream());
+        when(mockedCon.getInputStream()).thenReturn(new ByteArrayInputStream(Utils.utf8(response)));
+        return mockedCon;
+    }
+
+    protected File createTempDir(String directory) throws IOException {
+        File tmpDir = new File(System.getProperty("java.io.tmpdir"));
+
+        if (directory != null)
+            tmpDir = new File(tmpDir, directory);
+
+        if (!tmpDir.exists() && !tmpDir.mkdirs())
+            throw new IOException("Could not create " + tmpDir);
+
+        tmpDir.deleteOnExit();
+        log.debug("Created temp directory {}", tmpDir);
+        return tmpDir;
+    }
+
+    protected File createTempFile(File tmpDir,
+        String prefix,
+        String suffix,
+        String contents)
+        throws IOException {
+        File file = File.createTempFile(prefix, suffix, tmpDir);
+        log.debug("Created new temp file {}", file);
+        file.deleteOnExit();
+
+        try (FileWriter writer = new FileWriter(file)) {
+            writer.write(contents);
+        }
+
+        return file;
+    }
+
+    protected Map<String, ?> getSaslConfigs(Map<String, ?> configs) {
+        ConfigDef configDef = new ConfigDef();
+        configDef.withClientSaslSupport();
+        AbstractConfig sslClientConfig = new AbstractConfig(configDef, configs);
+        return sslClientConfig.values();
+    }
+
+    protected Map<String, ?> getSaslConfigs(String name, Object value) {
+        return getSaslConfigs(Collections.singletonMap(name, value));
+    }
+
+    protected Map<String, ?> getSaslConfigs() {
+        return getSaslConfigs(Collections.emptyMap());
+    }
+
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandlerTest.java
new file mode 100644
index 0000000..326197d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+public class OAuthBearerValidatorCallbackHandlerTest extends OAuthBearerTest {
+
+    @Test
+    public void testBasic() throws Exception {
+        String expectedAudience = "a";
+        List<String> allAudiences = Arrays.asList(expectedAudience, "b", "c");
+        AccessTokenBuilder builder = new AccessTokenBuilder().audience(expectedAudience);
+        String accessToken = builder.build();
+
+        Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_EXPECTED_AUDIENCE, allAudiences);
+        OAuthBearerValidatorCallbackHandler handler = createHandler(configs, builder);
+
+        try {
+            OAuthBearerValidatorCallback callback = new OAuthBearerValidatorCallback(accessToken);
+            handler.handle(new Callback[]{callback});
+
+            assertNotNull(callback.token());
+            OAuthBearerToken token = callback.token();
+            assertEquals(accessToken, token.value());
+            assertEquals(builder.subject(), token.principalName());
+            assertEquals(builder.expirationSeconds() * 1000, token.lifetimeMs());
+            assertEquals(builder.issuedAtSeconds() * 1000, token.startTimeMs());
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testInvalidAccessToken() throws Exception {
+        // There aren't different error messages for the validation step, so these are all the
+        // same :(
+        String substring = "invalid_token";
+        assertInvalidAccessTokenFails("this isn't valid", substring);
+        assertInvalidAccessTokenFails("this.isn't.valid", substring);
+        assertInvalidAccessTokenFails(createAccessKey("this", "isn't", "valid"), substring);
+        assertInvalidAccessTokenFails(createAccessKey("{}", "{}", "{}"), substring);
+    }
+
+    private void assertInvalidAccessTokenFails(String accessToken, String expectedMessageSubstring) throws Exception {
+        Map<String, ?> configs = getSaslConfigs();
+        OAuthBearerValidatorCallbackHandler handler = createHandler(configs, new AccessTokenBuilder());
+
+        try {
+            OAuthBearerValidatorCallback callback = new OAuthBearerValidatorCallback(accessToken);
+            handler.handle(new Callback[] {callback});
+
+            assertNull(callback.token());
+            String actualMessage = callback.errorStatus();
+            assertNotNull(actualMessage);
+            assertTrue(actualMessage.contains(expectedMessageSubstring), String.format("The error message \"%s\" didn't contain the expected substring \"%s\"", actualMessage, expectedMessageSubstring));
+        } finally {
+            handler.close();
+        }
+    }
+
+    private OAuthBearerValidatorCallbackHandler createHandler(Map<String, ?> options,
+        AccessTokenBuilder builder) {
+        OAuthBearerValidatorCallbackHandler handler = new OAuthBearerValidatorCallbackHandler();
+        CloseableVerificationKeyResolver verificationKeyResolver = (jws, nestingContext) ->
+                builder.jwk().getRsaPublicKey();
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(options, verificationKeyResolver);
+        handler.init(verificationKeyResolver, accessTokenValidator);
+        return handler;
+    }
+
+    private String createAccessKey(String header, String payload, String signature) {
+        Base64.Encoder enc = Base64.getEncoder();
+        header = enc.encodeToString(Utils.utf8(header));
+        payload = enc.encodeToString(Utils.utf8(payload));
+        signature = enc.encodeToString(Utils.utf8(signature));
+        return String.format("%s.%s.%s", header, payload, signature);
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwksTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwksTest.java
new file mode 100644
index 0000000..27711ea
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwksTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.security.oauthbearer.secured.RefreshingHttpsJwks.MISSING_KEY_ID_CACHE_IN_FLIGHT_MS;
+import static org.apache.kafka.common.security.oauthbearer.secured.RefreshingHttpsJwks.MISSING_KEY_ID_MAX_KEY_LENGTH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.jose4j.http.SimpleResponse;
+import org.jose4j.jwk.HttpsJwks;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class RefreshingHttpsJwksTest extends OAuthBearerTest {
+
+    private static final int REFRESH_MS = 5000;
+
+    private static final int RETRY_BACKOFF_MS = 50;
+
+    private static final int RETRY_BACKOFF_MAX_MS = 2000;
+
+    /**
+     * Test that a key not previously scheduled for refresh will be scheduled without a refresh.
+     */
+
+    @Test
+    public void testBasicScheduleRefresh() throws Exception {
+        String keyId = "abc123";
+        Time time = new MockTime();
+        HttpsJwks httpsJwks = spyHttpsJwks();
+
+        try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) {
+            refreshingHttpsJwks.init();
+            verify(httpsJwks, times(1)).refresh();
+            assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+            verify(httpsJwks, times(1)).refresh();
+        }
+    }
+
+    /**
+     * Test that a key previously scheduled for refresh will <b>not</b> be scheduled a second time
+     * if it's requested right away.
+     */
+
+    @Test
+    public void testMaybeExpediteRefreshNoDelay() throws Exception {
+        String keyId = "abc123";
+        Time time = new MockTime();
+        HttpsJwks httpsJwks = spyHttpsJwks();
+
+        try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) {
+            refreshingHttpsJwks.init();
+            assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+            assertFalse(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+        }
+    }
+
+    /**
+     * Test that a key previously scheduled for refresh <b>will</b> be scheduled a second time
+     * if it's requested after the delay.
+     */
+
+    @Test
+    public void testMaybeExpediteRefreshDelays() throws Exception {
+        assertMaybeExpediteRefreshWithDelay(MISSING_KEY_ID_CACHE_IN_FLIGHT_MS - 1, false);
+        assertMaybeExpediteRefreshWithDelay(MISSING_KEY_ID_CACHE_IN_FLIGHT_MS, true);
+        assertMaybeExpediteRefreshWithDelay(MISSING_KEY_ID_CACHE_IN_FLIGHT_MS + 1, true);
+    }
+
+    /**
+     * Test that a "long key" will not be looked up because the key ID is too long.
+     */
+
+    @Test
+    public void testLongKey() throws Exception {
+        char[] keyIdChars = new char[MISSING_KEY_ID_MAX_KEY_LENGTH + 1];
+        Arrays.fill(keyIdChars, '0');
+        String keyId = new String(keyIdChars);
+
+        Time time = new MockTime();
+        HttpsJwks httpsJwks = spyHttpsJwks();
+
+        try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) {
+            refreshingHttpsJwks.init();
+            verify(httpsJwks, times(1)).refresh();
+            assertFalse(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+            verify(httpsJwks, times(1)).refresh();
+        }
+    }
+
+    /**
+     * Test that if we ask to load a missing key, and then we wait past the sleep time that it will
+     * call refresh to load the key.
+     */
+
+    @Test
+    public void testSecondaryRefreshAfterElapsedDelay() throws Exception {
+        String keyId = "abc123";
+        Time time = MockTime.SYSTEM;    // Unfortunately, we can't mock time here because the
+                                        // scheduled executor doesn't respect it.
+        HttpsJwks httpsJwks = spyHttpsJwks();
+
+        try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) {
+            refreshingHttpsJwks.init();
+            verify(httpsJwks, times(1)).refresh();
+            assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+            time.sleep(REFRESH_MS + 1);
+            verify(httpsJwks, times(3)).refresh();
+            assertFalse(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+        }
+    }
+
+    private void assertMaybeExpediteRefreshWithDelay(long sleepDelay, boolean shouldBeScheduled) throws Exception {
+        String keyId = "abc123";
+        Time time = new MockTime();
+        HttpsJwks httpsJwks = spyHttpsJwks();
+
+        try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) {
+            refreshingHttpsJwks.init();
+            assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+            time.sleep(sleepDelay);
+            assertEquals(shouldBeScheduled, refreshingHttpsJwks.maybeExpediteRefresh(keyId));
+        }
+    }
+
+    private RefreshingHttpsJwks getRefreshingHttpsJwks(final Time time, final HttpsJwks httpsJwks) {
+        return new RefreshingHttpsJwks(time, httpsJwks, REFRESH_MS, RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS);
+    }
+
+    /**
+     * We *spy* (not *mock*) the {@link HttpsJwks} instance because we want to have it
+     * _partially mocked_ to determine if it's calling its internal refresh method. We want to
+     * make sure it *doesn't* do that when we call our getJsonWebKeys() method on
+     * {@link RefreshingHttpsJwks}.
+     */
+
+    private HttpsJwks spyHttpsJwks() {
+        HttpsJwks httpsJwks = new HttpsJwks("https://www.example.com");
+
+        SimpleResponse simpleResponse = new SimpleResponse() {
+            @Override
+            public int getStatusCode() {
+                return 200;
+            }
+
+            @Override
+            public String getStatusMessage() {
+                return "OK";
+            }
+
+            @Override
+            public Collection<String> getHeaderNames() {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public List<String> getHeaderValues(String name) {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public String getBody() {
+                return "{\"keys\": []}";
+            }
+        };
+
+        httpsJwks.setSimpleHttpGet(l -> simpleResponse);
+
+        return Mockito.spy(httpsJwks);
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/RetryTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/RetryTest.java
new file mode 100644
index 0000000..d04b8c5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/RetryTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+
+public class RetryTest extends OAuthBearerTest {
+
+    @Test
+    public void test() throws ExecutionException {
+        Exception[] attempts = new Exception[] {
+            new IOException("pretend connect error"),
+            new IOException("pretend timeout error"),
+            new IOException("pretend read error"),
+            null    // success!
+        };
+        long retryWaitMs = 1000;
+        long maxWaitMs = 10000;
+        Retryable<String> call = createRetryable(attempts);
+
+        Time time = new MockTime(0, 0, 0);
+        assertEquals(0L, time.milliseconds());
+        Retry<String> r = new Retry<>(time, retryWaitMs, maxWaitMs);
+        r.execute(call);
+
+        long secondWait = retryWaitMs * 2;
+        long thirdWait = retryWaitMs * 4;
+        long totalWait = retryWaitMs + secondWait + thirdWait;
+        assertEquals(totalWait, time.milliseconds());
+    }
+
+    @Test
+    public void testIOExceptionFailure() {
+        Exception[] attempts = new Exception[] {
+            new IOException("pretend connect error"),
+            new IOException("pretend timeout error"),
+            new IOException("pretend read error"),
+            new IOException("pretend another read error"),
+        };
+        long retryWaitMs = 1000;
+        long maxWaitMs = 1000 + 2000 + 3999;
+        Retryable<String> call = createRetryable(attempts);
+
+        Time time = new MockTime(0, 0, 0);
+        assertEquals(0L, time.milliseconds());
+        Retry<String> r = new Retry<>(time, retryWaitMs, maxWaitMs);
+
+        assertThrows(ExecutionException.class, () -> r.execute(call));
+
+        assertEquals(maxWaitMs, time.milliseconds());
+    }
+
+    @Test
+    public void testRuntimeExceptionFailureOnLastAttempt() {
+        Exception[] attempts = new Exception[] {
+            new IOException("pretend connect error"),
+            new IOException("pretend timeout error"),
+            new NullPointerException("pretend JSON node /userId in response is null")
+        };
+        long retryWaitMs = 1000;
+        long maxWaitMs = 10000;
+        Retryable<String> call = createRetryable(attempts);
+
+        Time time = new MockTime(0, 0, 0);
+        assertEquals(0L, time.milliseconds());
+        Retry<String> r = new Retry<>(time, retryWaitMs, maxWaitMs);
+
+        assertThrows(RuntimeException.class, () -> r.execute(call));
+
+        long secondWait = retryWaitMs * 2;
+        long totalWait = retryWaitMs + secondWait;
+        assertEquals(totalWait, time.milliseconds());
+    }
+
+    @Test
+    public void testRuntimeExceptionFailureOnFirstAttempt() {
+        Exception[] attempts = new Exception[] {
+            new NullPointerException("pretend JSON node /userId in response is null"),
+            null
+        };
+        long retryWaitMs = 1000;
+        long maxWaitMs = 10000;
+        Retryable<String> call = createRetryable(attempts);
+
+        Time time = new MockTime(0, 0, 0);
+        assertEquals(0L, time.milliseconds());
+        Retry<String> r = new Retry<>(time, retryWaitMs, maxWaitMs);
+
+        assertThrows(RuntimeException.class, () -> r.execute(call));
+
+        assertEquals(0, time.milliseconds());
+    }
+
+    @Test
+    public void testUseMaxTimeout() throws IOException {
+        Exception[] attempts = new Exception[] {
+            new IOException("pretend connect error"),
+            new IOException("pretend timeout error"),
+            new IOException("pretend read error")
+        };
+        long retryWaitMs = 5000;
+        long maxWaitMs = 5000;
+        Retryable<String> call = createRetryable(attempts);
+
+        Time time = new MockTime(0, 0, 0);
+        assertEquals(0L, time.milliseconds());
+        Retry<String> r = new Retry<>(time, retryWaitMs, maxWaitMs);
+
+        assertThrows(ExecutionException.class, () -> r.execute(call));
+
+        assertEquals(maxWaitMs, time.milliseconds());
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidatorTest.java
new file mode 100644
index 0000000..76333e3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidatorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.util.Collections;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.jose4j.jws.AlgorithmIdentifiers;
+import org.jose4j.jws.JsonWebSignature;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ValidatorAccessTokenValidatorTest extends AccessTokenValidatorTest {
+
+    @Override
+    protected AccessTokenValidator createAccessTokenValidator(AccessTokenBuilder builder) {
+        return new ValidatorAccessTokenValidator(30,
+            Collections.emptySet(),
+            null,
+            (jws, nestingContext) -> builder.jwk().getKey(),
+            builder.scopeClaimName(),
+            builder.subjectClaimName());
+    }
+
+    @Test
+    public void testBasicEncryption() throws Exception {
+        AccessTokenBuilder builder = new AccessTokenBuilder();
+        AccessTokenValidator validator = createAccessTokenValidator(builder);
+
+        JsonWebSignature jws = new JsonWebSignature();
+        jws.setKey(builder.jwk().getPrivateKey());
+        jws.setKeyIdHeaderValue(builder.jwk().getKeyId());
+        jws.setAlgorithmHeaderValue(AlgorithmIdentifiers.RSA_USING_SHA256);
+        String accessToken = builder.build();
+
+        OAuthBearerToken token = validator.validate(accessToken);
+
+        assertEquals(builder.subject(), token.principalName());
+        assertEquals(builder.issuedAtSeconds() * 1000, token.startTimeMs());
+        assertEquals(builder.expirationSeconds() * 1000, token.lifetimeMs());
+        assertEquals(1, token.scope().size());
+    }
+
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3092c29..91eb54f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -267,6 +267,14 @@ object Defaults {
   val SaslLoginRefreshWindowJitter = SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER
   val SaslLoginRefreshMinPeriodSeconds = SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS
   val SaslLoginRefreshBufferSeconds = SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS
+  val SaslLoginRetryBackoffMaxMs = SaslConfigs.DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS
+  val SaslLoginRetryBackoffMs = SaslConfigs.DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS
+  val SaslOAuthBearerScopeClaimName = SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME
+  val SaslOAuthBearerSubClaimName = SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME
+  val SaslOAuthBearerJwksEndpointRefreshMs = SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS
+  val SaslOAuthBearerJwksEndpointRetryBackoffMaxMs = SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS
+  val SaslOAuthBearerJwksEndpointRetryBackoffMs = SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS
+  val SaslOAuthBearerClockSkewSeconds = SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS
 
   /** ********* Delegation Token configuration ***********/
   val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L
@@ -593,6 +601,21 @@ object KafkaConfig {
   val SaslLoginRefreshMinPeriodSecondsProp = SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS
   val SaslLoginRefreshBufferSecondsProp = SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS
 
+  val SaslLoginConnectTimeoutMsProp = SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS
+  val SaslLoginReadTimeoutMsProp = SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS
+  val SaslLoginRetryBackoffMaxMsProp = SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS
+  val SaslLoginRetryBackoffMsProp = SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS
+  val SaslOAuthBearerScopeClaimNameProp = SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME
+  val SaslOAuthBearerSubClaimNameProp = SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME
+  val SaslOAuthBearerTokenEndpointUrlProp = SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL
+  val SaslOAuthBearerJwksEndpointUrlProp = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL
+  val SaslOAuthBearerJwksEndpointRefreshMsProp = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS
+  val SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS
+  val SaslOAuthBearerJwksEndpointRetryBackoffMsProp = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS
+  val SaslOAuthBearerClockSkewSecondsProp = SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS
+  val SaslOAuthBearerExpectedAudienceProp = SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE
+  val SaslOAuthBearerExpectedIssuerProp = SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER
+
   /** ********* Delegation Token Configuration ****************/
   val DelegationTokenSecretKeyAliasProp = "delegation.token.master.key"
   val DelegationTokenSecretKeyProp = "delegation.token.secret.key"
@@ -1004,6 +1027,21 @@ object KafkaConfig {
   val SaslLoginRefreshMinPeriodSecondsDoc = SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
   val SaslLoginRefreshBufferSecondsDoc = SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
 
+  val SaslLoginConnectTimeoutMsDoc = SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC
+  val SaslLoginReadTimeoutMsDoc = SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS_DOC
+  val SaslLoginRetryBackoffMaxMsDoc = SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC
+  val SaslLoginRetryBackoffMsDoc = SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS_DOC
+  val SaslOAuthBearerScopeClaimNameDoc = SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC
+  val SaslOAuthBearerSubClaimNameDoc = SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC
+  val SaslOAuthBearerTokenEndpointUrlDoc = SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC
+  val SaslOAuthBearerJwksEndpointUrlDoc = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC
+  val SaslOAuthBearerJwksEndpointRefreshMsDoc = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC
+  val SaslOAuthBearerJwksEndpointRetryBackoffMaxMsDoc = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC
+  val SaslOAuthBearerJwksEndpointRetryBackoffMsDoc = SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC
+  val SaslOAuthBearerClockSkewSecondsDoc = SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC
+  val SaslOAuthBearerExpectedAudienceDoc = SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC
+  val SaslOAuthBearerExpectedIssuerDoc = SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC
+
   /** ********* Delegation Token Configuration ****************/
   val DelegationTokenSecretKeyAliasDoc = s"DEPRECATED: An alias for $DelegationTokenSecretKeyProp, which should be used instead of this config."
   val DelegationTokenSecretKeyDoc = "Secret key to generate and verify delegation tokens. The same key must be configured across all the brokers. " +
@@ -1293,6 +1331,19 @@ object KafkaConfig {
       .define(SaslLoginRefreshWindowJitterProp, DOUBLE, Defaults.SaslLoginRefreshWindowJitter, MEDIUM, SaslLoginRefreshWindowJitterDoc)
       .define(SaslLoginRefreshMinPeriodSecondsProp, SHORT, Defaults.SaslLoginRefreshMinPeriodSeconds, MEDIUM, SaslLoginRefreshMinPeriodSecondsDoc)
       .define(SaslLoginRefreshBufferSecondsProp, SHORT, Defaults.SaslLoginRefreshBufferSeconds, MEDIUM, SaslLoginRefreshBufferSecondsDoc)
+      .define(SaslLoginConnectTimeoutMsProp, INT, null, LOW, SaslLoginConnectTimeoutMsDoc)
+      .define(SaslLoginReadTimeoutMsProp, INT, null, LOW, SaslLoginReadTimeoutMsDoc)
+      .define(SaslLoginRetryBackoffMaxMsProp, LONG, Defaults.SaslLoginRetryBackoffMaxMs, LOW, SaslLoginRetryBackoffMaxMsDoc)
+      .define(SaslLoginRetryBackoffMsProp, LONG, Defaults.SaslLoginRetryBackoffMs, LOW, SaslLoginRetryBackoffMsDoc)
+      .define(SaslOAuthBearerScopeClaimNameProp, STRING, Defaults.SaslOAuthBearerScopeClaimName, LOW, SaslOAuthBearerScopeClaimNameDoc)
+      .define(SaslOAuthBearerSubClaimNameProp, STRING, Defaults.SaslOAuthBearerSubClaimName, LOW, SaslOAuthBearerSubClaimNameDoc)
+      .define(SaslOAuthBearerTokenEndpointUrlProp, STRING, null, MEDIUM, SaslOAuthBearerTokenEndpointUrlDoc)
+      .define(SaslOAuthBearerJwksEndpointUrlProp, STRING, null, MEDIUM, SaslOAuthBearerJwksEndpointUrlDoc)
+      .define(SaslOAuthBearerJwksEndpointRefreshMsProp, LONG, Defaults.SaslOAuthBearerJwksEndpointRefreshMs, LOW, SaslOAuthBearerJwksEndpointRefreshMsDoc)
+      .define(SaslOAuthBearerClockSkewSecondsProp, INT, Defaults.SaslOAuthBearerClockSkewSeconds, LOW, SaslOAuthBearerClockSkewSecondsDoc)
+      .define(SaslOAuthBearerExpectedAudienceProp, LIST, null, LOW, SaslOAuthBearerExpectedAudienceDoc)
+      .define(SaslOAuthBearerExpectedIssuerProp, STRING, null, LOW, SaslOAuthBearerExpectedIssuerDoc)
+
       /** ********* Delegation Token Configuration ****************/
       .define(DelegationTokenSecretKeyAliasProp, PASSWORD, null, MEDIUM, DelegationTokenSecretKeyAliasDoc)
       .define(DelegationTokenSecretKeyProp, PASSWORD, null, MEDIUM, DelegationTokenSecretKeyDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e94a489..2b43806 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -768,6 +768,20 @@ class KafkaConfigTest {
         case KafkaConfig.SaslLoginRefreshWindowJitterProp =>
         case KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp =>
         case KafkaConfig.SaslLoginRefreshBufferSecondsProp =>
+        case KafkaConfig.SaslLoginConnectTimeoutMsProp =>
+        case KafkaConfig.SaslLoginReadTimeoutMsProp =>
+        case KafkaConfig.SaslLoginRetryBackoffMaxMsProp =>
+        case KafkaConfig.SaslLoginRetryBackoffMsProp =>
+        case KafkaConfig.SaslOAuthBearerScopeClaimNameProp =>
+        case KafkaConfig.SaslOAuthBearerSubClaimNameProp =>
+        case KafkaConfig.SaslOAuthBearerTokenEndpointUrlProp =>
+        case KafkaConfig.SaslOAuthBearerJwksEndpointUrlProp =>
+        case KafkaConfig.SaslOAuthBearerJwksEndpointRefreshMsProp =>
+        case KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp =>
+        case KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp =>
+        case KafkaConfig.SaslOAuthBearerClockSkewSecondsProp =>
+        case KafkaConfig.SaslOAuthBearerExpectedAudienceProp =>
+        case KafkaConfig.SaslOAuthBearerExpectedIssuerProp =>
 
         // Security config
         case KafkaConfig.securityProviderClassProp =>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 92cd783..050b2d2 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -80,6 +80,7 @@ versions += [
   jaxrs: "2.1.1",
   jfreechart: "1.0.0",
   jopt: "5.0.4",
+  jose4j: "0.7.8",
   junit: "5.7.1",
   jqwik: "1.5.0",
   kafka_0100: "0.10.0.1",
@@ -151,6 +152,7 @@ libs += [
   jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
   jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+  jose4j: "org.bitbucket.b_c:jose4j:$versions.jose4j",
   junitJupiter: "org.junit.jupiter:junit-jupiter:$versions.junit",
   junitJupiterApi: "org.junit.jupiter:junit-jupiter-api:$versions.junit",
   junitVintageEngine: "org.junit.vintage:junit-vintage-engine:$versions.junit",
diff --git a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
new file mode 100644
index 0000000..e173c49
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_DOC;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_DOC;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetriever;
+import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetrieverFactory;
+import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidator;
+import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidatorFactory;
+import org.apache.kafka.common.security.oauthbearer.secured.CloseableVerificationKeyResolver;
+import org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.secured.VerificationKeyResolverFactory;
+import org.apache.kafka.common.utils.Exit;
+
+public class OAuthCompatibilityTool {
+
+    public static void main(String[] args) {
+        String description = String.format(
+            "This tool is used to verify OAuth/OIDC provider compatibility.%n%n" +
+            "To use, first export KAFKA_OPTS with Java system properties that match%n" +
+            "your OAuth/OIDC configuration. Next, run the following script to%n" +
+            "execute the test:%n%n" +
+            "    ./bin/kafka-run-class.sh %s" +
+            "%n%n" +
+            "Please refer to the following source files for OAuth/OIDC client and%n" +
+            "broker configuration options:" +
+            "%n%n" +
+            "    %s%n" +
+            "    %s",
+            OAuthCompatibilityTool.class.getName(),
+            SaslConfigs.class.getName(),
+            OAuthBearerLoginCallbackHandler.class.getName());
+
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("oauth-compatibility-test")
+            .defaultHelp(true)
+            .description(description);
+
+        parser.addArgument("--connect-timeout-ms")
+            .type(Integer.class)
+            .dest("connectTimeoutMs")
+            .help(SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC);
+        parser.addArgument("--read-timeout-ms")
+            .type(Integer.class)
+            .dest("readTimeoutMs")
+            .help(SASL_LOGIN_READ_TIMEOUT_MS_DOC);
+        parser.addArgument("--login-retry-backoff-ms")
+            .type(Long.class)
+            .dest("loginRetryBackoffMs")
+            .help(SASL_LOGIN_RETRY_BACKOFF_MS_DOC);
+        parser.addArgument("--login-retry-backoff-max-ms")
+            .type(Long.class)
+            .dest("loginRetryBackoffMax")
+            .help(SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC);
+        parser.addArgument("--scope-claim-name")
+            .dest("scopeClaimName")
+            .help(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC);
+        parser.addArgument("--sub-claim-name")
+            .dest("subClaimName")
+            .help(SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC);
+        parser.addArgument("--token-endpoint-url")
+            .dest("tokenEndpointUrl")
+            .help(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC);
+        parser.addArgument("--jwks-endpoint-url")
+            .dest("jwksEndpointUrl")
+            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC);
+        parser.addArgument("--jwks-endpoint-refresh-ms")
+            .type(Long.class)
+            .dest("jwksEndpointRefreshMs")
+            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC);
+        parser.addArgument("--jwks-endpoint-retry-backoff-max-ms")
+            .type(Long.class)
+            .dest("jwksEndpointRetryBackoffMaxMs")
+            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC);
+        parser.addArgument("--jwks-endpoint-retry-backoff-ms")
+            .type(Long.class)
+            .dest("jwksEndpointRetryBackoffMs")
+            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC);
+        parser.addArgument("--clock-skew-seconds")
+            .type(Integer.class)
+            .dest("clockSkewSeconds")
+            .help(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC);
+        parser.addArgument("--expected-audience")
+            .dest("expectedAudience")
+            .help(SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC);
+        parser.addArgument("--expected-issuer")
+            .dest("expectedIssuer")
+            .help(SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
+
+        parser.addArgument("--client-id")
+            .dest("clientId")
+            .help(CLIENT_ID_DOC);
+        parser.addArgument("--client-secret")
+            .dest("clientSecret")
+            .help(CLIENT_SECRET_DOC);
+        parser.addArgument("--scope")
+            .dest("scope")
+            .help(SCOPE_DOC);
+
+        Namespace namespace;
+
+        try {
+            namespace = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            Exit.exit(1);
+            return;
+        }
+
+        Map<String, ?> configs = getConfigs(namespace);
+        Map<String, Object> jaasConfigs = getJaasConfigs(namespace);
+
+        try {
+            String accessToken;
+
+            {
+                // Client side...
+                try (AccessTokenRetriever atr = AccessTokenRetrieverFactory.create(configs, jaasConfigs)) {
+                    atr.init();
+                    AccessTokenValidator atv = AccessTokenValidatorFactory.create(configs);
+                    System.out.println("PASSED 1/5: client configuration");
+
+                    accessToken = atr.retrieve();
+                    System.out.println("PASSED 2/5: client JWT retrieval");
+
+                    atv.validate(accessToken);
+                    System.out.println("PASSED 3/5: client JWT validation");
+                }
+            }
+
+            {
+                // Broker side...
+                try (CloseableVerificationKeyResolver vkr = VerificationKeyResolverFactory.create(configs, jaasConfigs)) {
+                    vkr.init();
+                    AccessTokenValidator atv = AccessTokenValidatorFactory.create(configs, vkr);
+                    System.out.println("PASSED 4/5: broker configuration");
+
+                    atv.validate(accessToken);
+                    System.out.println("PASSED 5/5: broker JWT validation");
+                }
+            }
+
+            System.out.println("SUCCESS");
+            Exit.exit(0);
+        } catch (Throwable t) {
+            System.out.println("FAILED:");
+            t.printStackTrace();
+
+            if (t instanceof ConfigException) {
+                System.out.printf("%n");
+                parser.printHelp();
+            }
+
+            Exit.exit(1);
+        }
+    }
+
+    private static Map<String, ?> getConfigs(Namespace namespace) {
+        Map<String, Object> c = new HashMap<>();
+        maybeAddInt(namespace, "connectTimeoutMs", c, SASL_LOGIN_CONNECT_TIMEOUT_MS);
+        maybeAddInt(namespace, "readTimeoutMs", c, SASL_LOGIN_READ_TIMEOUT_MS);
+        maybeAddLong(namespace, "loginRetryBackoffMs", c, SASL_LOGIN_RETRY_BACKOFF_MS);
+        maybeAddLong(namespace, "loginRetryBackoffMax", c, SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
+        maybeAddString(namespace, "scopeClaimName", c, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
+        maybeAddString(namespace, "subClaimName", c, SASL_OAUTHBEARER_SUB_CLAIM_NAME);
+        maybeAddString(namespace, "tokenEndpointUrl", c, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
+        maybeAddString(namespace, "jwksEndpointUrl", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
+        maybeAddLong(namespace, "jwksEndpdointRefreshMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
+        maybeAddLong(namespace, "jwksEndpdointRetryBackoffMaxMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS);
+        maybeAddLong(namespace, "jwksEndpdointRetryBackoffMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS);
+        maybeAddInt(namespace, "clockSkewSeconds", c, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS);
+        maybeAddStringList(namespace, "expectedAudience", c, SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
+        maybeAddString(namespace, "expectedIssuer", c, SASL_OAUTHBEARER_EXPECTED_ISSUER);
+
+        // This here is going to fill in all the defaults for the values we don't specify...
+        ConfigDef cd = new ConfigDef();
+        SaslConfigs.addClientSaslSupport(cd);
+        AbstractConfig config = new AbstractConfig(cd, c);
+        return config.values();
+    }
+
+    private static void maybeAddInt(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) {
+        Integer value = namespace.getInt(namespaceKey);
+
+        if (value != null)
+            configs.put(configsKey, value);
+    }
+
+    private static void maybeAddLong(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) {
+        Long value = namespace.getLong(namespaceKey);
+
+        if (value != null)
+            configs.put(configsKey, value);
+    }
+
+    private static void maybeAddString(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) {
+        String value = namespace.getString(namespaceKey);
+
+        if (value != null)
+            configs.put(configsKey, value);
+    }
+
+    private static void maybeAddStringList(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) {
+        String value = namespace.getString(namespaceKey);
+
+        if (value != null)
+            configs.put(configsKey, Arrays.asList(value.split(",")));
+    }
+
+    private static Map<String, Object> getJaasConfigs(Namespace namespace) {
+        Map<String, Object> c = new HashMap<>();
+        c.put(CLIENT_ID_CONFIG, namespace.getString("clientId"));
+        c.put(CLIENT_SECRET_CONFIG, namespace.getString("clientSecret"));
+        c.put(SCOPE_CONFIG, namespace.getString("scope"));
+        return c;
+    }
+
+}