You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/20 22:33:45 UTC

[GitHub] [kafka] junrao commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

junrao commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r731343661



##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ConfigurationUtils</code> is a utility class to perform basic configuration-related
+ * logic and is separated out here for easier, more direct testing.
+ */
+
+public class ConfigurationUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class);
+
+    private final Map<String, ?> configs;
+
+    private final String prefix;
+
+    public ConfigurationUtils(Map<String, ?> configs) {
+        this(configs, null);
+    }
+
+    public ConfigurationUtils(Map<String, ?> configs, String saslMechanism) {

Review comment:
       saslMechanism seems to be used for the listener prefix. 
   (1) Could we name it as listener name?
   (2) The listener name is only relevant for the broker. Could we avoid passing in that for the client?
   (3) Listener name is not always the sasl mechanism name. Should we pass in the listener name?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>HttpAccessTokenRetriever</code> is an {@link AccessTokenRetriever} that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+    private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+    private static final Set<Integer> UNRETRYABLE_HTTP_CODES;
+
+    public static final String AUTHORIZATION_HEADER = "Authorization";
+
+    static {
+        // This does not have to be an exhaustive list. There are other HTTP codes that
+        // are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585)
+        // that we won't worry about yet. The worst case if a status code is missing from
+        // this set is that the request will be retried.
+        UNRETRYABLE_HTTP_CODES = new HashSet<>();
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+    }
+
+    private final String clientId;
+
+    private final String clientSecret;
+
+    private final String scope;
+
+    private final SSLSocketFactory sslSocketFactory;
+
+    private final String tokenEndpointUri;
+
+    private final long loginRetryBackoffMs;
+
+    private final long loginRetryBackoffMaxMs;
+
+    private final Integer loginConnectTimeoutMs;
+
+    private final Integer loginReadTimeoutMs;
+
+    public HttpAccessTokenRetriever(String clientId,
+        String clientSecret,
+        String scope,
+        SSLSocketFactory sslSocketFactory,
+        String tokenEndpointUri,
+        long loginRetryBackoffMs,
+        long loginRetryBackoffMaxMs,
+        Integer loginConnectTimeoutMs,
+        Integer loginReadTimeoutMs) {
+        this.clientId = Objects.requireNonNull(clientId);
+        this.clientSecret = Objects.requireNonNull(clientSecret);
+        this.scope = scope;
+        this.sslSocketFactory = sslSocketFactory;
+        this.tokenEndpointUri = Objects.requireNonNull(tokenEndpointUri);
+        this.loginRetryBackoffMs = loginRetryBackoffMs;
+        this.loginRetryBackoffMaxMs = loginRetryBackoffMaxMs;
+        this.loginConnectTimeoutMs = loginConnectTimeoutMs;
+        this.loginReadTimeoutMs = loginReadTimeoutMs;
+    }
+
+    /**
+     * Retrieves a JWT access token in its serialized three-part form. The implementation
+     * is free to determine how it should be retrieved but should not perform validation
+     * on the result.
+     *
+     * <b>Note</b>: This is a blocking function and callers should be aware that the
+     * implementation communicates over a network. The facility in the
+     * {@link javax.security.auth.spi.LoginModule} from which this is ultimately called
+     * does not provide an asynchronous approach.
+     *
+     * @return Non-<code>null</code> JWT access token string
+     *
+     * @throws IOException Thrown on errors related to IO during retrieval
+     */
+
+    @Override
+    public String retrieve() throws IOException {
+        String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
+        String requestBody = formatRequestBody(scope);
+
+        Retry<String> retry = new Retry<>(Time.SYSTEM,
+            loginRetryBackoffMs,
+            loginRetryBackoffMaxMs);
+
+        Map<String, String> headers = Collections.singletonMap(AUTHORIZATION_HEADER, authorizationHeader);
+
+        String responseBody = retry.execute(() -> {
+            HttpURLConnection con = (HttpURLConnection) new URL(tokenEndpointUri).openConnection();
+
+            if (sslSocketFactory != null && con instanceof HttpsURLConnection)
+                ((HttpsURLConnection) con).setSSLSocketFactory(sslSocketFactory);
+
+            try {
+                return post(con, headers, requestBody, loginConnectTimeoutMs, loginReadTimeoutMs);
+            } finally {
+                con.disconnect();
+            }
+        });
+        log.debug("retrieve - responseBody: {}", responseBody);
+
+        return parseAccessToken(responseBody);
+    }
+
+    public static String post(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        handleInput(con, headers, requestBody, connectTimeoutMs, readTimeoutMs);
+        return handleOutput(con);
+    }
+
+    private static void handleInput(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        log.debug("handleInput - starting post for {}", con.getURL());
+        con.setRequestMethod("POST");
+        con.setRequestProperty("Accept", "application/json");
+
+        if (headers != null) {
+            for (Map.Entry<String, String> header : headers.entrySet())
+                con.setRequestProperty(header.getKey(), header.getValue());
+        }
+
+        con.setRequestProperty("Cache-Control", "no-cache");
+
+        if (requestBody != null) {
+            con.setRequestProperty("Content-Length", String.valueOf(requestBody.length()));
+            con.setDoOutput(true);
+        }
+
+        con.setUseCaches(false);
+
+        if (connectTimeoutMs != null)
+            con.setConnectTimeout(connectTimeoutMs);
+
+        if (readTimeoutMs != null)
+            con.setReadTimeout(readTimeoutMs);
+
+        log.debug("handleInput - preparing to connect to {}", con.getURL());
+        con.connect();
+
+        if (requestBody != null) {
+            try (OutputStream os = con.getOutputStream()) {
+                ByteArrayInputStream is = new ByteArrayInputStream(requestBody.getBytes(
+                    StandardCharsets.UTF_8));
+                log.debug("handleInput - preparing to write request body to {}", con.getURL());
+                copy(is, os);
+            }
+        }
+    }
+
+    static String handleOutput(final HttpURLConnection con) throws IOException {
+        int responseCode = con.getResponseCode();
+        log.debug("handleOutput - responseCode: {}", responseCode);
+
+        String responseBody = null;
+
+        try (InputStream is = con.getInputStream()) {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            log.debug("handleOutput - preparing to read response body from {}", con.getURL());
+            copy(is, os);
+            responseBody = os.toString(StandardCharsets.UTF_8.name());
+        } catch (Exception e) {
+            log.warn("handleOutput - error retrieving data", e);
+        }
+
+        if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+            if (responseBody == null || responseBody.isEmpty())
+                throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %s from %s", responseCode, con.getURL()));
+
+            log.debug("handleOutput - responseCode: {}, response: {}", responseCode, responseBody);
+
+            return responseBody;
+        } else {
+            log.warn("handleOutput - error response code: {}, error response body: {}", responseCode, responseBody);
+
+            if (UNRETRYABLE_HTTP_CODES.contains(responseCode)) {
+                // We know that this is a non-transient error, so let's not keep retrying the
+                // request unnecessarily.
+                throw new UnretryableException(new IOException(String.format("The response code %s was encountered reading the token endpoint response; will not attempt further retries", responseCode)));
+            } else {
+                // We don't know if this is a transient (retryable) error or not, so let's assume
+                // it is.
+                throw new IOException(String.format("The unexpected response code %s was encountered reading the token endpoint response", responseCode));
+            }
+        }
+    }
+
+    static void copy(InputStream is, OutputStream os) throws IOException {
+        byte[] buf = new byte[4096];
+        int b;
+
+        while ((b = is.read(buf)) != -1)
+            os.write(buf, 0, b);
+    }
+
+    static String parseAccessToken(String responseBody) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode rootNode = mapper.readTree(responseBody);
+        JsonNode accessTokenNode = rootNode.at("/access_token");
+
+        if (accessTokenNode == null)
+            throw new IOException("The token endpoint response did not contain an access_token value");
+
+        return sanitizeString("The token endpoint response access_token", accessTokenNode.textValue());
+    }
+
+    static String formatAuthorizationHeader(String clientId, String clientSecret) throws IOException {
+        clientId = sanitizeString("The token endpoint request clientId", clientId);
+        clientSecret = sanitizeString("The token endpoint request clientId", clientSecret);
+
+        String s = String.format("%s:%s", clientId, clientSecret);
+        String encoded = Base64.getUrlEncoder().encodeToString(Utils.utf8(s));
+        return String.format("Basic %s", encoded);
+    }
+
+    static String formatRequestBody(String scope) throws IOException {
+        try {
+            StringBuilder requestParameters = new StringBuilder();
+            requestParameters.append("grant_type=client_credentials");
+
+            if (scope != null && !scope.trim().isEmpty()) {
+                scope = scope.trim();
+                String encodedScope = URLEncoder.encode(scope, StandardCharsets.UTF_8.name());
+                requestParameters.append("&scope=").append(encodedScope);
+            }
+
+            return requestParameters.toString();
+        } catch (UnsupportedEncodingException e) {
+            // The world has gone crazy!
+            throw new IOException(String.format("Encoding %s not supported", StandardCharsets.UTF_8.name()));
+        }
+    }
+
+    private static String sanitizeString(String name, String value) throws IOException {
+        if (value == null)
+            throw new IOException(String.format("%s value must be non-null", name));
+
+        if (value.isEmpty())
+            throw new IOException(String.format("%s value must be non-empty", name));
+
+        value = value.trim();
+
+        if (value.isEmpty())
+            throw new IOException(String.format("%s value must not contain only whitespace", name));

Review comment:
       name -> value ?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/PemDirectoryVerificationKeyResolver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.PublicKey;
+import java.security.interfaces.ECPublicKey;
+import java.security.interfaces.RSAPublicKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.common.utils.Utils;
+import org.jose4j.jwk.EllipticCurveJsonWebKey;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.jwk.RsaJsonWebKey;
+import org.jose4j.jws.JsonWebSignature;
+import org.jose4j.jwx.JsonWebStructure;
+import org.jose4j.keys.EcKeyUtil;
+import org.jose4j.keys.RsaKeyUtil;
+import org.jose4j.keys.resolvers.JwksVerificationKeyResolver;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.jose4j.lang.JoseException;
+import org.jose4j.lang.UnresolvableKeyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>PemVerificationKeyResolver</code> is a {@link VerificationKeyResolver} implementation

Review comment:
       So, PemVerificationKeyResolver is only used for testing and not for production use?

##########
File path: build.gradle
##########
@@ -1604,6 +1608,7 @@ project(':tools') {
     implementation libs.slf4jApi
     implementation libs.log4j
 
+    implementation libs.jose4j

Review comment:
       Is this needed since tools depends on clients, which already has this dependency?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ConfigurationUtils</code> is a utility class to perform basic configuration-related
+ * logic and is separated out here for easier, more direct testing.
+ */
+
+public class ConfigurationUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class);
+
+    private final Map<String, ?> configs;
+
+    private final String prefix;
+
+    public ConfigurationUtils(Map<String, ?> configs) {
+        this(configs, null);
+    }
+
+    public ConfigurationUtils(Map<String, ?> configs, String saslMechanism) {
+        this.configs = configs;
+
+        if (saslMechanism != null && !saslMechanism.trim().isEmpty())
+            this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim());
+        else
+            this.prefix = null;
+    }
+
+    public Map<String, ?> getSslClientConfig(String uriConfigName) {
+        String urlConfigValue = get(uriConfigName);
+
+        if (urlConfigValue == null || urlConfigValue.trim().isEmpty())
+            throw new ConfigException(String.format("The OAuth configuration option %s is required", uriConfigName));
+
+        URL url;
+
+        try {
+            url = new URL(urlConfigValue);
+        } catch (IOException e) {
+            throw new ConfigException(String.format("The OAuth configuration option %s was not a valid URL (%s)", uriConfigName, urlConfigValue));
+        }
+
+        if (!url.getProtocol().equalsIgnoreCase("https")) {
+            log.warn("Not creating SSL socket factory as URL for {} ({}) is not SSL-/TLS-based", uriConfigName, url);
+            return null;
+        }
+
+        ConfigDef sslConfigDef = new ConfigDef();
+        sslConfigDef.withClientSslSupport();
+        AbstractConfig sslClientConfig = new AbstractConfig(sslConfigDef, configs);
+        return sslClientConfig.values();
+    }
+
+    public SSLSocketFactory createSSLSocketFactory(String uriConfigName) {
+        Map<String, ?> sslClientConfig = getSslClientConfig(uriConfigName);
+
+        if (sslClientConfig == null) {
+            log.warn("Requesting SSL client socket factory but SSL configs were null");
+            return null;
+        }
+
+        SslFactory sslFactory = new SslFactory(Mode.CLIENT);
+        sslFactory.configure(sslClientConfig);
+        SSLSocketFactory socketFactory = ((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext().getSocketFactory();
+        log.debug("Created SSLSocketFactory: {}", sslClientConfig);
+        return socketFactory;
+    }
+
+    /**
+     * Validates that, if a value is supplied, is a file that:
+     *
+     * <li>
+     *     <ul>exists</ul>
+     *     <ul>has read permission</ul>
+     *     <ul>points to a file</ul>
+     * </li>
+     *
+     * If the value is null or an empty string, it is assumed to be an "empty" value and thus.
+     * ignored. Any whitespace is trimmed off of the beginning and end.
+     */
+
+    public Path validateFile(String name) {
+        URI uri = validateUri(name);
+        File file = new File(uri.getRawPath()).getAbsoluteFile();
+
+        if (!file.exists())
+            throw new ConfigException(name, file, String.format("The OAuth configuration option %s contains a file (%s) that doesn't exist", name, file));
+
+        if (!file.canRead())
+            throw new ConfigException(name, file, String.format("The OAuth configuration option %s contains a file (%s) that doesn't have read permission", name, file));
+
+        if (file.isDirectory())
+            throw new ConfigException(name, file, String.format("The OAuth configuration option %s references a directory (%s), not a file", name, file));
+
+        return file.toPath();
+    }
+
+    /**
+     * Validates that, if a value is supplied, is a value that:
+     *
+     * <li>
+     *     <ul>is an Integer</ul>
+     *     <ul>has a value that is not less than the provided minimum value</ul>
+     * </li>
+     *
+     * If the value is null or an empty string, it is assumed to be an "empty" value and thus
+     * ignored. Any whitespace is trimmed off of the beginning and end.
+     */
+
+    public Integer validateInteger(String name) {
+        return validateInteger(name, true);
+    }
+
+    public Integer validateInteger(String name, boolean isRequired) {
+        return validateInteger(name, isRequired, null);
+    }
+
+    public Integer validateInteger(String name, boolean isRequired, Integer min) {
+        Integer value = get(name);
+
+        if (value == null) {
+            if (isRequired)
+                throw new ConfigException(name, null, String.format("The OAuth configuration option %s must be non-null", name));
+            else
+                return null;
+        }
+
+        if (min != null && value < min)
+            throw new ConfigException(name, value, String.format("The OAuth configuration option %s value must be at least %s", name, min));
+
+        return value;
+    }
+
+    /**
+     * Validates that, if a value is supplied, is a value that:
+     *
+     * <li>
+     *     <ul>is an Integer</ul>
+     *     <ul>has a value that is not less than the provided minimum value</ul>
+     * </li>
+     *
+     * If the value is null or an empty string, it is assumed to be an "empty" value and thus
+     * ignored. Any whitespace is trimmed off of the beginning and end.
+     */
+
+    public Long validateLong(String name) {
+        return validateLong(name, true);
+    }
+
+    public Long validateLong(String name, boolean isRequired) {
+        return validateLong(name, isRequired, null);
+    }
+
+    public Long validateLong(String name, boolean isRequired, Long min) {
+        Long value = get(name);
+
+        if (value == null) {
+            if (isRequired)
+                throw new ConfigException(name, null, String.format("The OAuth configuration option %s must be non-null", name));
+            else
+                return null;
+        }
+
+        if (min != null && value < min)
+            throw new ConfigException(name, value, String.format("The OAuth configuration option %s value must be at least %s", name, min));
+
+        return value;
+    }
+
+    /**
+     * Validates that the configured URI that:
+     *
+     * <li>
+     *     <ul>is well-formed</ul>
+     *     <ul>contains a scheme</ul>
+     *     <ul>uses either HTTP, HTTPS, or file protocols</ul>
+     * </li>
+     *
+     * No effort is made to contact the URL in the validation step.
+     */
+
+    public URI validateUri(String name) {
+        String value = validateString(name);
+        URI uri;
+
+        try {
+            uri = new URI(value.trim());

Review comment:
       value is already trimmed in validateString(). So, no need to trim it again.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ConfigurationUtils</code> is a utility class to perform basic configuration-related
+ * logic and is separated out here for easier, more direct testing.
+ */
+
+public class ConfigurationUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class);
+
+    private final Map<String, ?> configs;
+
+    private final String prefix;
+
+    public ConfigurationUtils(Map<String, ?> configs) {
+        this(configs, null);
+    }
+
+    public ConfigurationUtils(Map<String, ?> configs, String saslMechanism) {
+        this.configs = configs;
+
+        if (saslMechanism != null && !saslMechanism.trim().isEmpty())
+            this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim());
+        else
+            this.prefix = null;
+    }
+
+    public Map<String, ?> getSslClientConfig(String uriConfigName) {
+        String urlConfigValue = get(uriConfigName);
+
+        if (urlConfigValue == null || urlConfigValue.trim().isEmpty())
+            throw new ConfigException(String.format("The OAuth configuration option %s is required", uriConfigName));
+
+        URL url;
+
+        try {
+            url = new URL(urlConfigValue);
+        } catch (IOException e) {
+            throw new ConfigException(String.format("The OAuth configuration option %s was not a valid URL (%s)", uriConfigName, urlConfigValue));
+        }
+
+        if (!url.getProtocol().equalsIgnoreCase("https")) {
+            log.warn("Not creating SSL socket factory as URL for {} ({}) is not SSL-/TLS-based", uriConfigName, url);
+            return null;
+        }
+
+        ConfigDef sslConfigDef = new ConfigDef();
+        sslConfigDef.withClientSslSupport();

Review comment:
       Hmm, these are ssl properties for connections between Kafka client and Kafka broker. Here, we need the ssl properties for connecting to the oauth provider. They may require a different set of configurations.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler {

Review comment:
       Could we add a high level description of the class?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+    public static final String CLIENT_ID_CONFIG = "clientId";
+    public static final String CLIENT_SECRET_CONFIG = "clientSecret";
+    public static final String SCOPE_CONFIG = "scope";
+
+    public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " +
+        "client ID to uniquely identify the service account to use for authentication for " +
+        "this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " +
+        "value and is provided to the OAuth provider using the OAuth " +
+        "clientcredentials grant type.";
+
+    public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " +
+        "client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " +
+        "account and identifies the service account to use for authentication for " +
+        "this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " +
+        "value and is provided to the OAuth provider using the OAuth " +
+        "clientcredentials grant type.";
+
+    public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " +
+        "token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need to specify an " +
+        "OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " +
+        "include with the login request.";
+
+    private static final String EXTENSION_PREFIX = "extension_";
+
+    private Map<String, Object> moduleOptions;
+
+    private AccessTokenRetriever accessTokenRetriever;
+
+    private AccessTokenValidator accessTokenValidator;
+
+    private boolean isConfigured = false;
+
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size()));
+
+        moduleOptions = Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+        AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, saslMechanism, moduleOptions);
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism);
+        configure(accessTokenRetriever, accessTokenValidator);
+    }
+
+    public void configure(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator accessTokenValidator) {
+        this.accessTokenRetriever = accessTokenRetriever;
+        this.accessTokenValidator = accessTokenValidator;
+
+        try {
+            this.accessTokenRetriever.init();
+        } catch (IOException e) {
+            throw new KafkaException("The OAuth login configuration encountered an error when initializing the AccessTokenRetriever", e);
+        }
+
+        isConfigured = true;
+    }
+
+    @Override
+    public void close() {
+        if (accessTokenRetriever != null) {
+            try {
+                this.accessTokenRetriever.close();
+            } catch (IOException e) {
+                log.warn("The OAuth login configuration encountered an error when closing the AccessTokenRetriever", e);
+            }
+        }
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        checkConfigured();
+
+        for (Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerTokenCallback) {
+                handle((OAuthBearerTokenCallback) callback);
+            } else if (callback instanceof SaslExtensionsCallback) {
+                handle((SaslExtensionsCallback) callback);
+            } else {
+                throw new UnsupportedCallbackException(callback);
+            }
+        }
+    }
+
+    void handle(OAuthBearerTokenCallback callback) throws IOException {

Review comment:
       Could this method and the next one be private?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>HttpAccessTokenRetriever</code> is an {@link AccessTokenRetriever} that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+    private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+    private static final Set<Integer> UNRETRYABLE_HTTP_CODES;
+
+    public static final String AUTHORIZATION_HEADER = "Authorization";
+
+    static {
+        // This does not have to be an exhaustive list. There are other HTTP codes that
+        // are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585)
+        // that we won't worry about yet. The worst case if a status code is missing from
+        // this set is that the request will be retried.
+        UNRETRYABLE_HTTP_CODES = new HashSet<>();
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+    }
+
+    private final String clientId;
+
+    private final String clientSecret;
+
+    private final String scope;
+
+    private final SSLSocketFactory sslSocketFactory;
+
+    private final String tokenEndpointUri;
+
+    private final long loginRetryBackoffMs;
+
+    private final long loginRetryBackoffMaxMs;
+
+    private final Integer loginConnectTimeoutMs;
+
+    private final Integer loginReadTimeoutMs;
+
+    public HttpAccessTokenRetriever(String clientId,
+        String clientSecret,
+        String scope,
+        SSLSocketFactory sslSocketFactory,
+        String tokenEndpointUri,
+        long loginRetryBackoffMs,
+        long loginRetryBackoffMaxMs,
+        Integer loginConnectTimeoutMs,
+        Integer loginReadTimeoutMs) {
+        this.clientId = Objects.requireNonNull(clientId);
+        this.clientSecret = Objects.requireNonNull(clientSecret);
+        this.scope = scope;
+        this.sslSocketFactory = sslSocketFactory;
+        this.tokenEndpointUri = Objects.requireNonNull(tokenEndpointUri);
+        this.loginRetryBackoffMs = loginRetryBackoffMs;
+        this.loginRetryBackoffMaxMs = loginRetryBackoffMaxMs;
+        this.loginConnectTimeoutMs = loginConnectTimeoutMs;
+        this.loginReadTimeoutMs = loginReadTimeoutMs;
+    }
+
+    /**
+     * Retrieves a JWT access token in its serialized three-part form. The implementation
+     * is free to determine how it should be retrieved but should not perform validation
+     * on the result.
+     *
+     * <b>Note</b>: This is a blocking function and callers should be aware that the
+     * implementation communicates over a network. The facility in the
+     * {@link javax.security.auth.spi.LoginModule} from which this is ultimately called
+     * does not provide an asynchronous approach.
+     *
+     * @return Non-<code>null</code> JWT access token string
+     *
+     * @throws IOException Thrown on errors related to IO during retrieval
+     */
+
+    @Override
+    public String retrieve() throws IOException {
+        String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
+        String requestBody = formatRequestBody(scope);
+
+        Retry<String> retry = new Retry<>(Time.SYSTEM,
+            loginRetryBackoffMs,
+            loginRetryBackoffMaxMs);
+
+        Map<String, String> headers = Collections.singletonMap(AUTHORIZATION_HEADER, authorizationHeader);
+
+        String responseBody = retry.execute(() -> {
+            HttpURLConnection con = (HttpURLConnection) new URL(tokenEndpointUri).openConnection();
+
+            if (sslSocketFactory != null && con instanceof HttpsURLConnection)
+                ((HttpsURLConnection) con).setSSLSocketFactory(sslSocketFactory);
+
+            try {
+                return post(con, headers, requestBody, loginConnectTimeoutMs, loginReadTimeoutMs);
+            } finally {
+                con.disconnect();
+            }
+        });
+        log.debug("retrieve - responseBody: {}", responseBody);
+
+        return parseAccessToken(responseBody);
+    }
+
+    public static String post(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        handleInput(con, headers, requestBody, connectTimeoutMs, readTimeoutMs);
+        return handleOutput(con);
+    }
+
+    private static void handleInput(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        log.debug("handleInput - starting post for {}", con.getURL());
+        con.setRequestMethod("POST");
+        con.setRequestProperty("Accept", "application/json");
+
+        if (headers != null) {
+            for (Map.Entry<String, String> header : headers.entrySet())
+                con.setRequestProperty(header.getKey(), header.getValue());
+        }
+
+        con.setRequestProperty("Cache-Control", "no-cache");
+
+        if (requestBody != null) {
+            con.setRequestProperty("Content-Length", String.valueOf(requestBody.length()));
+            con.setDoOutput(true);
+        }
+
+        con.setUseCaches(false);
+
+        if (connectTimeoutMs != null)
+            con.setConnectTimeout(connectTimeoutMs);
+
+        if (readTimeoutMs != null)
+            con.setReadTimeout(readTimeoutMs);
+
+        log.debug("handleInput - preparing to connect to {}", con.getURL());
+        con.connect();
+
+        if (requestBody != null) {
+            try (OutputStream os = con.getOutputStream()) {
+                ByteArrayInputStream is = new ByteArrayInputStream(requestBody.getBytes(
+                    StandardCharsets.UTF_8));
+                log.debug("handleInput - preparing to write request body to {}", con.getURL());
+                copy(is, os);
+            }
+        }
+    }
+
+    static String handleOutput(final HttpURLConnection con) throws IOException {
+        int responseCode = con.getResponseCode();
+        log.debug("handleOutput - responseCode: {}", responseCode);
+
+        String responseBody = null;
+
+        try (InputStream is = con.getInputStream()) {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            log.debug("handleOutput - preparing to read response body from {}", con.getURL());
+            copy(is, os);
+            responseBody = os.toString(StandardCharsets.UTF_8.name());
+        } catch (Exception e) {
+            log.warn("handleOutput - error retrieving data", e);
+        }
+
+        if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+            if (responseBody == null || responseBody.isEmpty())
+                throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %s from %s", responseCode, con.getURL()));
+
+            log.debug("handleOutput - responseCode: {}, response: {}", responseCode, responseBody);
+
+            return responseBody;
+        } else {
+            log.warn("handleOutput - error response code: {}, error response body: {}", responseCode, responseBody);
+
+            if (UNRETRYABLE_HTTP_CODES.contains(responseCode)) {
+                // We know that this is a non-transient error, so let's not keep retrying the
+                // request unnecessarily.
+                throw new UnretryableException(new IOException(String.format("The response code %s was encountered reading the token endpoint response; will not attempt further retries", responseCode)));
+            } else {
+                // We don't know if this is a transient (retryable) error or not, so let's assume
+                // it is.
+                throw new IOException(String.format("The unexpected response code %s was encountered reading the token endpoint response", responseCode));
+            }
+        }
+    }
+
+    static void copy(InputStream is, OutputStream os) throws IOException {
+        byte[] buf = new byte[4096];
+        int b;
+
+        while ((b = is.read(buf)) != -1)
+            os.write(buf, 0, b);
+    }
+
+    static String parseAccessToken(String responseBody) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode rootNode = mapper.readTree(responseBody);
+        JsonNode accessTokenNode = rootNode.at("/access_token");
+
+        if (accessTokenNode == null)
+            throw new IOException("The token endpoint response did not contain an access_token value");

Review comment:
       Should we include responseBody in the exception?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ConfigurationUtils</code> is a utility class to perform basic configuration-related
+ * logic and is separated out here for easier, more direct testing.
+ */
+
+public class ConfigurationUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class);
+
+    private final Map<String, ?> configs;
+
+    private final String prefix;
+
+    public ConfigurationUtils(Map<String, ?> configs) {
+        this(configs, null);
+    }
+
+    public ConfigurationUtils(Map<String, ?> configs, String saslMechanism) {
+        this.configs = configs;
+
+        if (saslMechanism != null && !saslMechanism.trim().isEmpty())
+            this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim());
+        else
+            this.prefix = null;
+    }
+
+    public Map<String, ?> getSslClientConfig(String uriConfigName) {
+        String urlConfigValue = get(uriConfigName);
+
+        if (urlConfigValue == null || urlConfigValue.trim().isEmpty())
+            throw new ConfigException(String.format("The OAuth configuration option %s is required", uriConfigName));
+
+        URL url;
+
+        try {
+            url = new URL(urlConfigValue);
+        } catch (IOException e) {
+            throw new ConfigException(String.format("The OAuth configuration option %s was not a valid URL (%s)", uriConfigName, urlConfigValue));
+        }
+
+        if (!url.getProtocol().equalsIgnoreCase("https")) {
+            log.warn("Not creating SSL socket factory as URL for {} ({}) is not SSL-/TLS-based", uriConfigName, url);
+            return null;
+        }
+
+        ConfigDef sslConfigDef = new ConfigDef();
+        sslConfigDef.withClientSslSupport();
+        AbstractConfig sslClientConfig = new AbstractConfig(sslConfigDef, configs);
+        return sslClientConfig.values();
+    }
+
+    public SSLSocketFactory createSSLSocketFactory(String uriConfigName) {
+        Map<String, ?> sslClientConfig = getSslClientConfig(uriConfigName);
+
+        if (sslClientConfig == null) {
+            log.warn("Requesting SSL client socket factory but SSL configs were null");

Review comment:
       Do we need this warning since we already log a warn in getSslClientConfig()?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactory.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
+
+import java.net.URI;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+
+public class AccessTokenRetrieverFactory  {
+
+    /**
+     * Create an {@link AccessTokenRetriever} from the given SASL and JAAS configuration.
+     *
+     * <b>Note</b>: the returned <code>AccessTokenRetriever</code> is <em>not</em> initialized
+     * here and must be done by the caller prior to use.
+     *
+     * @param configs    SASL configuration
+     * @param jaasConfig JAAS configuration
+     *
+     * @return Non-<code>null</code> {@link AccessTokenRetriever}
+     */
+
+    public static AccessTokenRetriever create(Map<String, ?> configs, Map<String, Object> jaasConfig) {
+        return create(configs, null, jaasConfig);
+    }
+
+    public static AccessTokenRetriever create(Map<String, ?> configs, String saslMechanism, Map<String, Object> jaasConfig) {
+        ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
+        URI tokenEndpointUri = cu.validateUri(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI);
+
+        if (tokenEndpointUri.getScheme().toLowerCase(Locale.ROOT).equals("file")) {
+            return new FileTokenRetriever(cu.validateFile(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI));
+        } else {
+            ConfigurationUtils jaasCu = new ConfigurationUtils(jaasConfig);
+            String clientId = jaasCu.validateString(CLIENT_ID_CONFIG);
+            String clientSecret = jaasCu.validateString(CLIENT_SECRET_CONFIG);
+            String scope = jaasCu.get(SCOPE_CONFIG);
+
+            SSLSocketFactory sslSocketFactory = cu.createSSLSocketFactory(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI);
+
+            return new HttpAccessTokenRetriever(clientId,
+                clientSecret,
+                scope,
+                sslSocketFactory,
+                tokenEndpointUri.toString(),
+                cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS),
+                cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
+                cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false, null),

Review comment:
       This could just use the constructor without the min value. Ditto below.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>HttpAccessTokenRetriever</code> is an {@link AccessTokenRetriever} that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+    private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+    private static final Set<Integer> UNRETRYABLE_HTTP_CODES;
+
+    public static final String AUTHORIZATION_HEADER = "Authorization";
+
+    static {
+        // This does not have to be an exhaustive list. There are other HTTP codes that
+        // are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585)
+        // that we won't worry about yet. The worst case if a status code is missing from
+        // this set is that the request will be retried.
+        UNRETRYABLE_HTTP_CODES = new HashSet<>();
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+    }
+
+    private final String clientId;
+
+    private final String clientSecret;
+
+    private final String scope;
+
+    private final SSLSocketFactory sslSocketFactory;
+
+    private final String tokenEndpointUri;
+
+    private final long loginRetryBackoffMs;
+
+    private final long loginRetryBackoffMaxMs;
+
+    private final Integer loginConnectTimeoutMs;
+
+    private final Integer loginReadTimeoutMs;
+
+    public HttpAccessTokenRetriever(String clientId,
+        String clientSecret,
+        String scope,
+        SSLSocketFactory sslSocketFactory,
+        String tokenEndpointUri,
+        long loginRetryBackoffMs,
+        long loginRetryBackoffMaxMs,
+        Integer loginConnectTimeoutMs,
+        Integer loginReadTimeoutMs) {
+        this.clientId = Objects.requireNonNull(clientId);
+        this.clientSecret = Objects.requireNonNull(clientSecret);
+        this.scope = scope;
+        this.sslSocketFactory = sslSocketFactory;
+        this.tokenEndpointUri = Objects.requireNonNull(tokenEndpointUri);
+        this.loginRetryBackoffMs = loginRetryBackoffMs;
+        this.loginRetryBackoffMaxMs = loginRetryBackoffMaxMs;
+        this.loginConnectTimeoutMs = loginConnectTimeoutMs;
+        this.loginReadTimeoutMs = loginReadTimeoutMs;
+    }
+
+    /**
+     * Retrieves a JWT access token in its serialized three-part form. The implementation
+     * is free to determine how it should be retrieved but should not perform validation
+     * on the result.
+     *
+     * <b>Note</b>: This is a blocking function and callers should be aware that the
+     * implementation communicates over a network. The facility in the
+     * {@link javax.security.auth.spi.LoginModule} from which this is ultimately called
+     * does not provide an asynchronous approach.
+     *
+     * @return Non-<code>null</code> JWT access token string
+     *
+     * @throws IOException Thrown on errors related to IO during retrieval
+     */
+
+    @Override
+    public String retrieve() throws IOException {
+        String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
+        String requestBody = formatRequestBody(scope);
+
+        Retry<String> retry = new Retry<>(Time.SYSTEM,
+            loginRetryBackoffMs,
+            loginRetryBackoffMaxMs);
+
+        Map<String, String> headers = Collections.singletonMap(AUTHORIZATION_HEADER, authorizationHeader);
+
+        String responseBody = retry.execute(() -> {
+            HttpURLConnection con = (HttpURLConnection) new URL(tokenEndpointUri).openConnection();
+
+            if (sslSocketFactory != null && con instanceof HttpsURLConnection)
+                ((HttpsURLConnection) con).setSSLSocketFactory(sslSocketFactory);
+
+            try {
+                return post(con, headers, requestBody, loginConnectTimeoutMs, loginReadTimeoutMs);
+            } finally {
+                con.disconnect();
+            }
+        });
+        log.debug("retrieve - responseBody: {}", responseBody);
+
+        return parseAccessToken(responseBody);
+    }
+
+    public static String post(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        handleInput(con, headers, requestBody, connectTimeoutMs, readTimeoutMs);
+        return handleOutput(con);
+    }
+
+    private static void handleInput(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        log.debug("handleInput - starting post for {}", con.getURL());
+        con.setRequestMethod("POST");
+        con.setRequestProperty("Accept", "application/json");
+
+        if (headers != null) {
+            for (Map.Entry<String, String> header : headers.entrySet())
+                con.setRequestProperty(header.getKey(), header.getValue());
+        }
+
+        con.setRequestProperty("Cache-Control", "no-cache");
+
+        if (requestBody != null) {
+            con.setRequestProperty("Content-Length", String.valueOf(requestBody.length()));
+            con.setDoOutput(true);
+        }
+
+        con.setUseCaches(false);
+
+        if (connectTimeoutMs != null)
+            con.setConnectTimeout(connectTimeoutMs);
+
+        if (readTimeoutMs != null)
+            con.setReadTimeout(readTimeoutMs);
+
+        log.debug("handleInput - preparing to connect to {}", con.getURL());
+        con.connect();
+
+        if (requestBody != null) {
+            try (OutputStream os = con.getOutputStream()) {
+                ByteArrayInputStream is = new ByteArrayInputStream(requestBody.getBytes(
+                    StandardCharsets.UTF_8));
+                log.debug("handleInput - preparing to write request body to {}", con.getURL());
+                copy(is, os);
+            }
+        }
+    }
+
+    static String handleOutput(final HttpURLConnection con) throws IOException {
+        int responseCode = con.getResponseCode();
+        log.debug("handleOutput - responseCode: {}", responseCode);
+
+        String responseBody = null;
+
+        try (InputStream is = con.getInputStream()) {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            log.debug("handleOutput - preparing to read response body from {}", con.getURL());
+            copy(is, os);
+            responseBody = os.toString(StandardCharsets.UTF_8.name());
+        } catch (Exception e) {
+            log.warn("handleOutput - error retrieving data", e);
+        }
+
+        if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+            if (responseBody == null || responseBody.isEmpty())
+                throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %s from %s", responseCode, con.getURL()));
+
+            log.debug("handleOutput - responseCode: {}, response: {}", responseCode, responseBody);
+
+            return responseBody;
+        } else {
+            log.warn("handleOutput - error response code: {}, error response body: {}", responseCode, responseBody);
+
+            if (UNRETRYABLE_HTTP_CODES.contains(responseCode)) {
+                // We know that this is a non-transient error, so let's not keep retrying the
+                // request unnecessarily.
+                throw new UnretryableException(new IOException(String.format("The response code %s was encountered reading the token endpoint response; will not attempt further retries", responseCode)));
+            } else {
+                // We don't know if this is a transient (retryable) error or not, so let's assume
+                // it is.
+                throw new IOException(String.format("The unexpected response code %s was encountered reading the token endpoint response", responseCode));
+            }
+        }
+    }
+
+    static void copy(InputStream is, OutputStream os) throws IOException {
+        byte[] buf = new byte[4096];
+        int b;
+
+        while ((b = is.read(buf)) != -1)
+            os.write(buf, 0, b);
+    }
+
+    static String parseAccessToken(String responseBody) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode rootNode = mapper.readTree(responseBody);
+        JsonNode accessTokenNode = rootNode.at("/access_token");
+
+        if (accessTokenNode == null)
+            throw new IOException("The token endpoint response did not contain an access_token value");
+
+        return sanitizeString("The token endpoint response access_token", accessTokenNode.textValue());
+    }
+
+    static String formatAuthorizationHeader(String clientId, String clientSecret) throws IOException {
+        clientId = sanitizeString("The token endpoint request clientId", clientId);
+        clientSecret = sanitizeString("The token endpoint request clientId", clientSecret);
+
+        String s = String.format("%s:%s", clientId, clientSecret);
+        String encoded = Base64.getUrlEncoder().encodeToString(Utils.utf8(s));
+        return String.format("Basic %s", encoded);
+    }
+
+    static String formatRequestBody(String scope) throws IOException {
+        try {
+            StringBuilder requestParameters = new StringBuilder();
+            requestParameters.append("grant_type=client_credentials");
+
+            if (scope != null && !scope.trim().isEmpty()) {
+                scope = scope.trim();
+                String encodedScope = URLEncoder.encode(scope, StandardCharsets.UTF_8.name());
+                requestParameters.append("&scope=").append(encodedScope);
+            }
+
+            return requestParameters.toString();
+        } catch (UnsupportedEncodingException e) {
+            // The world has gone crazy!
+            throw new IOException(String.format("Encoding %s not supported", StandardCharsets.UTF_8.name()));
+        }
+    }
+
+    private static String sanitizeString(String name, String value) throws IOException {
+        if (value == null)
+            throw new IOException(String.format("%s value must be non-null", name));

Review comment:
       Hmm, this is not really an IOException. Perhaps it should be sth like IllegalArgument. Ditto below.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>HttpAccessTokenRetriever</code> is an {@link AccessTokenRetriever} that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+    private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+    private static final Set<Integer> UNRETRYABLE_HTTP_CODES;
+
+    public static final String AUTHORIZATION_HEADER = "Authorization";
+
+    static {
+        // This does not have to be an exhaustive list. There are other HTTP codes that
+        // are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585)
+        // that we won't worry about yet. The worst case if a status code is missing from
+        // this set is that the request will be retried.
+        UNRETRYABLE_HTTP_CODES = new HashSet<>();
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+    }
+
+    private final String clientId;
+
+    private final String clientSecret;
+
+    private final String scope;
+
+    private final SSLSocketFactory sslSocketFactory;
+
+    private final String tokenEndpointUri;
+
+    private final long loginRetryBackoffMs;
+
+    private final long loginRetryBackoffMaxMs;
+
+    private final Integer loginConnectTimeoutMs;
+
+    private final Integer loginReadTimeoutMs;
+
+    public HttpAccessTokenRetriever(String clientId,
+        String clientSecret,
+        String scope,
+        SSLSocketFactory sslSocketFactory,
+        String tokenEndpointUri,
+        long loginRetryBackoffMs,
+        long loginRetryBackoffMaxMs,
+        Integer loginConnectTimeoutMs,
+        Integer loginReadTimeoutMs) {
+        this.clientId = Objects.requireNonNull(clientId);
+        this.clientSecret = Objects.requireNonNull(clientSecret);
+        this.scope = scope;
+        this.sslSocketFactory = sslSocketFactory;
+        this.tokenEndpointUri = Objects.requireNonNull(tokenEndpointUri);
+        this.loginRetryBackoffMs = loginRetryBackoffMs;
+        this.loginRetryBackoffMaxMs = loginRetryBackoffMaxMs;
+        this.loginConnectTimeoutMs = loginConnectTimeoutMs;
+        this.loginReadTimeoutMs = loginReadTimeoutMs;
+    }
+
+    /**
+     * Retrieves a JWT access token in its serialized three-part form. The implementation
+     * is free to determine how it should be retrieved but should not perform validation
+     * on the result.
+     *
+     * <b>Note</b>: This is a blocking function and callers should be aware that the
+     * implementation communicates over a network. The facility in the
+     * {@link javax.security.auth.spi.LoginModule} from which this is ultimately called
+     * does not provide an asynchronous approach.
+     *
+     * @return Non-<code>null</code> JWT access token string
+     *
+     * @throws IOException Thrown on errors related to IO during retrieval
+     */
+
+    @Override
+    public String retrieve() throws IOException {
+        String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
+        String requestBody = formatRequestBody(scope);
+
+        Retry<String> retry = new Retry<>(Time.SYSTEM,
+            loginRetryBackoffMs,
+            loginRetryBackoffMaxMs);
+
+        Map<String, String> headers = Collections.singletonMap(AUTHORIZATION_HEADER, authorizationHeader);
+
+        String responseBody = retry.execute(() -> {
+            HttpURLConnection con = (HttpURLConnection) new URL(tokenEndpointUri).openConnection();
+
+            if (sslSocketFactory != null && con instanceof HttpsURLConnection)
+                ((HttpsURLConnection) con).setSSLSocketFactory(sslSocketFactory);
+
+            try {
+                return post(con, headers, requestBody, loginConnectTimeoutMs, loginReadTimeoutMs);
+            } finally {
+                con.disconnect();
+            }
+        });
+        log.debug("retrieve - responseBody: {}", responseBody);
+
+        return parseAccessToken(responseBody);
+    }
+
+    public static String post(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        handleInput(con, headers, requestBody, connectTimeoutMs, readTimeoutMs);
+        return handleOutput(con);
+    }
+
+    private static void handleInput(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        log.debug("handleInput - starting post for {}", con.getURL());
+        con.setRequestMethod("POST");
+        con.setRequestProperty("Accept", "application/json");
+
+        if (headers != null) {
+            for (Map.Entry<String, String> header : headers.entrySet())
+                con.setRequestProperty(header.getKey(), header.getValue());
+        }
+
+        con.setRequestProperty("Cache-Control", "no-cache");
+
+        if (requestBody != null) {
+            con.setRequestProperty("Content-Length", String.valueOf(requestBody.length()));
+            con.setDoOutput(true);
+        }
+
+        con.setUseCaches(false);
+
+        if (connectTimeoutMs != null)
+            con.setConnectTimeout(connectTimeoutMs);
+
+        if (readTimeoutMs != null)
+            con.setReadTimeout(readTimeoutMs);
+
+        log.debug("handleInput - preparing to connect to {}", con.getURL());
+        con.connect();
+
+        if (requestBody != null) {
+            try (OutputStream os = con.getOutputStream()) {
+                ByteArrayInputStream is = new ByteArrayInputStream(requestBody.getBytes(
+                    StandardCharsets.UTF_8));
+                log.debug("handleInput - preparing to write request body to {}", con.getURL());
+                copy(is, os);
+            }
+        }
+    }
+
+    static String handleOutput(final HttpURLConnection con) throws IOException {
+        int responseCode = con.getResponseCode();
+        log.debug("handleOutput - responseCode: {}", responseCode);
+
+        String responseBody = null;
+
+        try (InputStream is = con.getInputStream()) {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            log.debug("handleOutput - preparing to read response body from {}", con.getURL());
+            copy(is, os);
+            responseBody = os.toString(StandardCharsets.UTF_8.name());
+        } catch (Exception e) {
+            log.warn("handleOutput - error retrieving data", e);
+        }
+
+        if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+            if (responseBody == null || responseBody.isEmpty())
+                throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %s from %s", responseCode, con.getURL()));
+
+            log.debug("handleOutput - responseCode: {}, response: {}", responseCode, responseBody);
+
+            return responseBody;
+        } else {
+            log.warn("handleOutput - error response code: {}, error response body: {}", responseCode, responseBody);
+
+            if (UNRETRYABLE_HTTP_CODES.contains(responseCode)) {
+                // We know that this is a non-transient error, so let's not keep retrying the
+                // request unnecessarily.
+                throw new UnretryableException(new IOException(String.format("The response code %s was encountered reading the token endpoint response; will not attempt further retries", responseCode)));
+            } else {
+                // We don't know if this is a transient (retryable) error or not, so let's assume
+                // it is.
+                throw new IOException(String.format("The unexpected response code %s was encountered reading the token endpoint response", responseCode));
+            }
+        }
+    }
+
+    static void copy(InputStream is, OutputStream os) throws IOException {
+        byte[] buf = new byte[4096];
+        int b;
+
+        while ((b = is.read(buf)) != -1)
+            os.write(buf, 0, b);
+    }
+
+    static String parseAccessToken(String responseBody) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode rootNode = mapper.readTree(responseBody);
+        JsonNode accessTokenNode = rootNode.at("/access_token");
+
+        if (accessTokenNode == null)
+            throw new IOException("The token endpoint response did not contain an access_token value");
+
+        return sanitizeString("The token endpoint response access_token", accessTokenNode.textValue());
+    }
+
+    static String formatAuthorizationHeader(String clientId, String clientSecret) throws IOException {
+        clientId = sanitizeString("The token endpoint request clientId", clientId);
+        clientSecret = sanitizeString("The token endpoint request clientId", clientSecret);
+
+        String s = String.format("%s:%s", clientId, clientSecret);
+        String encoded = Base64.getUrlEncoder().encodeToString(Utils.utf8(s));
+        return String.format("Basic %s", encoded);
+    }
+
+    static String formatRequestBody(String scope) throws IOException {
+        try {
+            StringBuilder requestParameters = new StringBuilder();
+            requestParameters.append("grant_type=client_credentials");
+
+            if (scope != null && !scope.trim().isEmpty()) {
+                scope = scope.trim();
+                String encodedScope = URLEncoder.encode(scope, StandardCharsets.UTF_8.name());
+                requestParameters.append("&scope=").append(encodedScope);
+            }
+
+            return requestParameters.toString();
+        } catch (UnsupportedEncodingException e) {
+            // The world has gone crazy!
+            throw new IOException(String.format("Encoding %s not supported", StandardCharsets.UTF_8.name()));
+        }
+    }
+
+    private static String sanitizeString(String name, String value) throws IOException {
+        if (value == null)
+            throw new IOException(String.format("%s value must be non-null", name));
+
+        if (value.isEmpty())
+            throw new IOException(String.format("%s value must be non-empty", name));

Review comment:
       name -> value ?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.jose4j.jwt.JwtClaims;
+import org.jose4j.jwt.MalformedClaimException;
+import org.jose4j.jwt.NumericDate;
+import org.jose4j.jwt.ReservedClaimNames;
+import org.jose4j.jwt.consumer.InvalidJwtException;
+import org.jose4j.jwt.consumer.JwtConsumer;
+import org.jose4j.jwt.consumer.JwtConsumerBuilder;
+import org.jose4j.jwt.consumer.JwtContext;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used
+ * by the broker to perform more extensive validation of the JWT access token that is received
+ * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's
+ * token endpoint.
+ *
+ * The validation steps performed (primary by the jose4j library) are:
+ *
+ * <ol>
+ *     <li>
+ *         Basic structural validation of the <code>b64token</code> value as defined in
+ *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a>
+ *     </li>
+ *     <li>Basic conversion of the token into an in-memory data structure</li>
+ *     <li>
+ *         Presence of scope, <code>exp</code>, subject, <code>iss</code>, and
+ *         <code>iat</code> claims
+ *     </li>
+ *     <li>
+ *         Signature matching validation against the <code>kid</code> and those provided b
+ *         the OAuth/OIDC provider's JWKS
+ *     </li>
+ * </ol>
+ */
+
+public class ValidatorAccessTokenValidator implements AccessTokenValidator {
+
+    private static final Logger log = LoggerFactory.getLogger(ValidatorAccessTokenValidator.class);
+
+    private final JwtConsumer jwtConsumer;
+
+    private final String scopeClaimName;
+
+    private final String subClaimName;
+
+    /**
+     * Creates a new ValidatorAccessTokenValidator that will be used by the broker for more
+     * thorough validation of the JWT.
+     *
+     * @param clockSkew               The optional value (in seconds) to allow for differences
+     *                                between the time of the OAuth/OIDC identity provider and
+     *                                the broker. If <code>null</code> is provided, the broker
+     *                                and the OAUth/OIDC identity provider are assumed to have
+     *                                very close clock settings.
+     * @param expectedAudiences       The (optional) set the broker will use to verify that
+     *                                the JWT was issued for one of the expected audiences.
+     *                                The JWT will be inspected for the standard OAuth
+     *                                <code>aud</code> claim and if this value is set, the
+     *                                broker will match the value from JWT's <code>aud</code>
+     *                                claim to see if there is an exact match. If there is no
+     *                                match, the broker will reject the JWT and authentication
+     *                                will fail. May be <code>null</code> to not perform an

Review comment:
       `May be <code>null</code> to not perform an` seems incomplete.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.jose4j.jwt.JwtClaims;
+import org.jose4j.jwt.MalformedClaimException;
+import org.jose4j.jwt.NumericDate;
+import org.jose4j.jwt.ReservedClaimNames;
+import org.jose4j.jwt.consumer.InvalidJwtException;
+import org.jose4j.jwt.consumer.JwtConsumer;
+import org.jose4j.jwt.consumer.JwtConsumerBuilder;
+import org.jose4j.jwt.consumer.JwtContext;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used
+ * by the broker to perform more extensive validation of the JWT access token that is received

Review comment:
       It seems that ValidatorAccessTokenValidator is used for validation in the client too?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler {

Review comment:
       Could we add a comment to this class?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+    public static final String CLIENT_ID_CONFIG = "clientId";
+    public static final String CLIENT_SECRET_CONFIG = "clientSecret";
+    public static final String SCOPE_CONFIG = "scope";
+
+    public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " +
+        "client ID to uniquely identify the service account to use for authentication for " +
+        "this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " +
+        "value and is provided to the OAuth provider using the OAuth " +
+        "clientcredentials grant type.";
+
+    public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " +
+        "client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " +
+        "account and identifies the service account to use for authentication for " +
+        "this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " +
+        "value and is provided to the OAuth provider using the OAuth " +
+        "clientcredentials grant type.";
+
+    public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " +
+        "token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need to specify an " +
+        "OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " +
+        "include with the login request.";
+
+    private static final String EXTENSION_PREFIX = "extension_";
+
+    private Map<String, Object> moduleOptions;
+
+    private AccessTokenRetriever accessTokenRetriever;
+
+    private AccessTokenValidator accessTokenValidator;
+
+    private boolean isInitialized = false;
+
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size()));
+
+        moduleOptions = Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+        AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, saslMechanism, moduleOptions);

Review comment:
       saslMechanism is used for listener prefix. Since listener is a broker side thing it seems that we don't need to pass this in the client? Ditto below.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+    private CloseableVerificationKeyResolver verificationKeyResolver;
+
+    private AccessTokenValidator accessTokenValidator;
+
+    private boolean isInitialized = false;
+
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism);
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver);

Review comment:
       saslMechanism is used as listener prefix. It seems that we need to pass in the listener name too?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JwksFileVerificationKeyResolver.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.security.Key;
+import java.util.List;
+import org.apache.kafka.common.utils.Utils;
+import org.jose4j.jwk.JsonWebKeySet;
+import org.jose4j.jws.JsonWebSignature;
+import org.jose4j.jwx.JsonWebStructure;
+import org.jose4j.keys.resolvers.JwksVerificationKeyResolver;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.jose4j.lang.JoseException;
+import org.jose4j.lang.UnresolvableKeyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>JwksFileVerificationKeyResolver</code> is a {@link VerificationKeyResolver} implementation
+ * that will load the JWKS from the given file system directory.

Review comment:
       Could we describe the file format in code and the KIP?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.jose4j.jwt.JwtClaims;
+import org.jose4j.jwt.MalformedClaimException;
+import org.jose4j.jwt.NumericDate;
+import org.jose4j.jwt.ReservedClaimNames;
+import org.jose4j.jwt.consumer.InvalidJwtException;
+import org.jose4j.jwt.consumer.JwtConsumer;
+import org.jose4j.jwt.consumer.JwtConsumerBuilder;
+import org.jose4j.jwt.consumer.JwtContext;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used
+ * by the broker to perform more extensive validation of the JWT access token that is received
+ * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's
+ * token endpoint.
+ *
+ * The validation steps performed (primary by the jose4j library) are:
+ *
+ * <ol>
+ *     <li>
+ *         Basic structural validation of the <code>b64token</code> value as defined in
+ *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a>
+ *     </li>
+ *     <li>Basic conversion of the token into an in-memory data structure</li>
+ *     <li>
+ *         Presence of scope, <code>exp</code>, subject, <code>iss</code>, and
+ *         <code>iat</code> claims
+ *     </li>
+ *     <li>
+ *         Signature matching validation against the <code>kid</code> and those provided b

Review comment:
       provided b => provided by

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+    private CloseableVerificationKeyResolver verificationKeyResolver;
+
+    private AccessTokenValidator accessTokenValidator;
+
+    private boolean isInitialized = false;
+
+    @Override
+    public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+        CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism);
+        AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver);
+        init(verificationKeyResolver, accessTokenValidator);
+    }
+
+    public void init(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) {
+        this.verificationKeyResolver = verificationKeyResolver;
+        this.accessTokenValidator = accessTokenValidator;
+
+        try {
+            verificationKeyResolver.init();
+        } catch (Exception e) {
+            throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e);
+        }
+
+        isInitialized = true;
+    }
+
+    @Override
+    public void close() {
+        if (verificationKeyResolver != null) {
+            try {
+                verificationKeyResolver.close();
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        checkInitialized();
+
+        for (Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerValidatorCallback) {
+                handleValidatorCallback((OAuthBearerValidatorCallback) callback);
+            } else if (callback instanceof OAuthBearerExtensionsValidatorCallback) {
+                OAuthBearerExtensionsValidatorCallback extensionsCallback = (OAuthBearerExtensionsValidatorCallback) callback;
+                extensionsCallback.inputExtensions().map().forEach((extensionName, v) -> extensionsCallback.valid(extensionName));
+            } else {
+                throw new UnsupportedCallbackException(callback);
+            }
+        }
+    }
+
+    private void handleValidatorCallback(OAuthBearerValidatorCallback callback) {
+        checkInitialized();
+
+        OAuthBearerToken token;
+
+        try {
+            token = accessTokenValidator.validate(callback.tokenValue());
+            log.debug("handle - token: {}", token);
+            callback.token(token);
+        } catch (ValidateException e) {
+            e.printStackTrace();

Review comment:
       In general, we shouldn't print to stdout and should use logging instead.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/PemDirectoryVerificationKeyResolver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.PublicKey;
+import java.security.interfaces.ECPublicKey;
+import java.security.interfaces.RSAPublicKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.common.utils.Utils;
+import org.jose4j.jwk.EllipticCurveJsonWebKey;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.jwk.RsaJsonWebKey;
+import org.jose4j.jws.JsonWebSignature;
+import org.jose4j.jwx.JsonWebStructure;
+import org.jose4j.keys.EcKeyUtil;
+import org.jose4j.keys.RsaKeyUtil;
+import org.jose4j.keys.resolvers.JwksVerificationKeyResolver;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.jose4j.lang.JoseException;
+import org.jose4j.lang.UnresolvableKeyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>PemVerificationKeyResolver</code> is a {@link VerificationKeyResolver} implementation
+ * that will load the PEM files from the given file system directory.
+ *
+ * The instance is configured with the directory name that contains one or more
+ * <a href=\"https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail\">public key files</a>

Review comment:
       Is https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail the right URL?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.jose4j.jwk.HttpsJwks;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.lang.JoseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or
+ * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's
+ * possible to receive a JWT that contains a <code>kid</code> that points to yet-unknown JWK,
+ * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice,
+ * keys are made available for some amount of time before they're used within JWTs.
+ *
+ * This instance is created and provided to the
+ * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using
+ * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then
+ * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of
+ * a JWT.
+ *
+ * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver
+ * @see org.jose4j.keys.resolvers.VerificationKeyResolver
+ * @see ValidatorAccessTokenValidator
+ */
+
+public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class);
+
+    private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16;
+
+    private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 60000;
+
+    private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000;
+
+    private static final int SHUTDOWN_TIMEOUT = 10;
+
+    private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS;
+
+    private final ScheduledExecutorService executorService;
+
+    private final long refreshMs;
+
+    private final ReadWriteLock refreshLock = new ReentrantReadWriteLock();
+
+    private final Map<String, Long> missingKeyIds;
+
+    private List<JsonWebKey> jsonWebKeys;
+
+    private boolean isInited;
+
+    /**
+     *
+     * @param location  HTTP/HTTPS endpoint from which to retrieve the JWKS based on
+     *                  the OAuth/OIDC standard
+     * @param refreshMs The number of milliseconds between refresh passes to connect
+     *                  to the OAuth/OIDC JWKS endpoint to retrieve the latest set
+     */
+
+    public RefreshingHttpsJwks(String location, long refreshMs) {
+        super(location);
+
+        if (refreshMs <= 0)
+            throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive");
+
+        setDefaultCacheDuration(refreshMs);
+
+        this.refreshMs = refreshMs;
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.missingKeyIds = new LinkedHashMap<String, Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
+                return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES;
+            }
+        };
+    }
+
+    @Override
+    public void init() throws IOException {
+        try {
+            log.debug("init started");
+
+            List<JsonWebKey> localJWKs;
+
+            try {
+                localJWKs = super.getJsonWebKeys();
+            } catch (JoseException e) {
+                throw new IOException("Could not refresh JWKS", e);
+            }
+
+            try {
+                refreshLock.writeLock().lock();
+                this.jsonWebKeys = localJWKs;

Review comment:
       Hmm, I am a bit confused. What is jsonWebKeys intended for? Why is it set during init() but not after refresh()?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retry encapsulates the mechanism to perform a retry and then exponential
+ * backoff using provided wait times between attempts.
+ *
+ * @param <R> Result type
+ */
+
+public class Retry<R> {

Review comment:
       This and Retryable are not oauth specific. Perhaps they can be put in common.utils?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.jose4j.jwt.JwtClaims;
+import org.jose4j.jwt.MalformedClaimException;
+import org.jose4j.jwt.NumericDate;
+import org.jose4j.jwt.ReservedClaimNames;
+import org.jose4j.jwt.consumer.InvalidJwtException;
+import org.jose4j.jwt.consumer.JwtConsumer;
+import org.jose4j.jwt.consumer.JwtConsumerBuilder;
+import org.jose4j.jwt.consumer.JwtContext;
+import org.jose4j.keys.resolvers.VerificationKeyResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used
+ * by the broker to perform more extensive validation of the JWT access token that is received
+ * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's
+ * token endpoint.
+ *
+ * The validation steps performed (primary by the jose4j library) are:
+ *
+ * <ol>
+ *     <li>
+ *         Basic structural validation of the <code>b64token</code> value as defined in
+ *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a>
+ *     </li>
+ *     <li>Basic conversion of the token into an in-memory data structure</li>
+ *     <li>
+ *         Presence of scope, <code>exp</code>, subject, <code>iss</code>, and
+ *         <code>iat</code> claims
+ *     </li>
+ *     <li>
+ *         Signature matching validation against the <code>kid</code> and those provided b
+ *         the OAuth/OIDC provider's JWKS
+ *     </li>
+ * </ol>
+ */
+
+public class ValidatorAccessTokenValidator implements AccessTokenValidator {
+
+    private static final Logger log = LoggerFactory.getLogger(ValidatorAccessTokenValidator.class);
+
+    private final JwtConsumer jwtConsumer;
+
+    private final String scopeClaimName;
+
+    private final String subClaimName;
+
+    /**
+     * Creates a new ValidatorAccessTokenValidator that will be used by the broker for more
+     * thorough validation of the JWT.
+     *
+     * @param clockSkew               The optional value (in seconds) to allow for differences
+     *                                between the time of the OAuth/OIDC identity provider and
+     *                                the broker. If <code>null</code> is provided, the broker
+     *                                and the OAUth/OIDC identity provider are assumed to have
+     *                                very close clock settings.
+     * @param expectedAudiences       The (optional) set the broker will use to verify that
+     *                                the JWT was issued for one of the expected audiences.
+     *                                The JWT will be inspected for the standard OAuth
+     *                                <code>aud</code> claim and if this value is set, the
+     *                                broker will match the value from JWT's <code>aud</code>
+     *                                claim to see if there is an exact match. If there is no
+     *                                match, the broker will reject the JWT and authentication
+     *                                will fail. May be <code>null</code> to not perform an
+     * @param expectedIssuer          The (optional) value for the broker to use to verify that
+     *                                the JWT was created by the expected issuer. The JWT will
+     *                                be inspected for the standard OAuth <code>iss</code> claim
+     *                                and if this value is set, the broker will match it exactly
+     *                                against what is in the JWT's <code>iss</code> claim.
+     *                                If there is no match, the broker will reject the JWT and
+     *                                authentication will fail.

Review comment:
       What happens when expectedIssuer is null?

##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     implementation libs.jacksonDataformatCsv
     implementation libs.jacksonJDK8Datatypes
     implementation libs.joptSimple
+    implementation libs.jose4j

Review comment:
       Is this needed since core depends on clients, which already has this dependency?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.jose4j.jwk.HttpsJwks;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.lang.JoseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or
+ * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's
+ * possible to receive a JWT that contains a <code>kid</code> that points to yet-unknown JWK,
+ * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice,
+ * keys are made available for some amount of time before they're used within JWTs.
+ *
+ * This instance is created and provided to the
+ * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using
+ * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then
+ * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of
+ * a JWT.
+ *
+ * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver
+ * @see org.jose4j.keys.resolvers.VerificationKeyResolver
+ * @see ValidatorAccessTokenValidator
+ */
+
+public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class);
+
+    private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16;
+
+    private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 60000;
+
+    private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000;
+
+    private static final int SHUTDOWN_TIMEOUT = 10;
+
+    private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS;
+
+    private final ScheduledExecutorService executorService;
+
+    private final long refreshMs;
+
+    private final ReadWriteLock refreshLock = new ReentrantReadWriteLock();
+
+    private final Map<String, Long> missingKeyIds;
+
+    private List<JsonWebKey> jsonWebKeys;
+
+    private boolean isInited;
+
+    /**
+     *
+     * @param location  HTTP/HTTPS endpoint from which to retrieve the JWKS based on
+     *                  the OAuth/OIDC standard
+     * @param refreshMs The number of milliseconds between refresh passes to connect
+     *                  to the OAuth/OIDC JWKS endpoint to retrieve the latest set
+     */
+
+    public RefreshingHttpsJwks(String location, long refreshMs) {
+        super(location);
+
+        if (refreshMs <= 0)
+            throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive");
+
+        setDefaultCacheDuration(refreshMs);
+
+        this.refreshMs = refreshMs;
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.missingKeyIds = new LinkedHashMap<String, Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
+                return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES;
+            }
+        };
+    }
+
+    @Override
+    public void init() throws IOException {
+        try {
+            log.debug("init started");
+
+            List<JsonWebKey> localJWKs;
+
+            try {
+                localJWKs = super.getJsonWebKeys();
+            } catch (JoseException e) {
+                throw new IOException("Could not refresh JWKS", e);
+            }
+
+            try {
+                refreshLock.writeLock().lock();
+                this.jsonWebKeys = localJWKs;
+            } finally {
+                refreshLock.writeLock().unlock();
+            }
+
+            executorService.scheduleAtFixedRate(this::refreshInternal,
+                0,
+                refreshMs,
+                TimeUnit.MILLISECONDS);
+
+            log.info("JWKS validation key refresh thread started with a refresh interval of {} ms", refreshMs);
+        } finally {
+            isInited = true;
+
+            log.debug("init completed");
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            log.debug("close started");
+
+            try {
+                log.debug("JWKS validation key refresh thread shutting down");
+                executorService.shutdown();
+
+                if (!executorService.awaitTermination(SHUTDOWN_TIMEOUT, SHUTDOWN_TIME_UNIT)) {
+                    log.warn("JWKS validation key refresh thread termination did not end after {} {}",
+                        SHUTDOWN_TIMEOUT, SHUTDOWN_TIME_UNIT);
+                }
+            } catch (InterruptedException e) {
+                log.warn("JWKS validation key refresh thread error during close", e);
+            }
+        } finally {
+            log.debug("close completed");
+        }
+    }
+
+    @Override
+    public List<JsonWebKey> getJsonWebKeys() throws JoseException, IOException {
+        if (!isInited)
+            throw new IllegalStateException("Please call init() first");
+
+        try {
+            refreshLock.readLock().lock();
+            return jsonWebKeys;
+        } finally {
+            refreshLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Internal method that will refresh the cache and if errors are encountered, re-queues
+     * the refresh attempt in a background thread.
+     */
+
+    private void refreshInternal() {
+        try {
+            log.info("JWKS validation key refresh of {} starting", getLocation());
+
+            // Call the *actual* refresh implementation.
+            refresh();
+
+            List<JsonWebKey> jwks = getJsonWebKeys();
+
+            try {
+                refreshLock.writeLock().lock();
+
+                for (JsonWebKey jwk : jwks)
+                    missingKeyIds.remove(jwk.getKeyId());
+            } finally {
+                refreshLock.writeLock().unlock();
+            }
+
+            log.info("JWKS validation key refresh of {} complete", getLocation());
+        } catch (JoseException | IOException e) {
+            // Let's wait a random, but short amount of time before trying again.
+            long waitMs = ThreadLocalRandom.current().nextLong(1000, 10000);

Review comment:
       Should we backoff exponentially up to the next refresh interval?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
##########
@@ -75,30 +78,111 @@
     public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC = "Login refresh thread will sleep until the specified window factor relative to the"
             + " credential's lifetime has been reached, at which time it will try to refresh the credential."
             + " Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used"
-            + " if no value is specified. Currently applies only to OAUTHBEARER.";
+            + " if no value is specified."
+            + OAUTHBEARER_NOTE;
     public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80;
 
     public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER = "sasl.login.refresh.window.jitter";
     public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC = "The maximum amount of random jitter relative to the credential's lifetime"
             + " that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;"
-            + " a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.";
+            + " a default value of 0.05 (5%) is used if no value is specified."
+            + OAUTHBEARER_NOTE;
     public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05;
 
     public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS = "sasl.login.refresh.min.period.seconds";
     public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC = "The desired minimum time for the login refresh thread to wait before refreshing a credential,"
             + " in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified.  This value and "
             + " sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
-            + " Currently applies only to OAUTHBEARER.";
+            + OAUTHBEARER_NOTE;
     public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60;
 
     public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS = "sasl.login.refresh.buffer.seconds";
     public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC = "The amount of buffer time before credential expiration to maintain when refreshing a credential,"
             + " in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain"
             + " as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of  300 (5 minutes) is used if no value is specified."
             + " This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
-            + " Currently applies only to OAUTHBEARER.";
+            + OAUTHBEARER_NOTE;
     public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300;
 
+    public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms";
+    public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider connection timeout."
+            + OAUTHBEARER_NOTE;
+
+    public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms";
+    public static final String SASL_LOGIN_READ_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider read timeout."
+            + OAUTHBEARER_NOTE;
+
+    // These are only specified here outside their normal groupings so that they can be
+    // forward referencing.
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms";
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS = "sasl.login.retry.backoff.max.ms";
+
+    private static final String EXPONENTIAL_BACKOFF_NOTE = " Login uses an exponential backoff algorithm with an initial wait based on the"
+            + " " + SASL_LOGIN_RETRY_BACKOFF_MS
+            + " setting and will double in wait length between attempts up to a maximum wait length specified by the"
+            + " " + SASL_LOGIN_RETRY_BACKOFF_MAX_MS
+            + " setting.";
+
+    public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS = 10000;
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC = "The (optional) value in milliseconds for the maximum wait between login attempts to the"
+            + " external authentication provider."
+            + EXPONENTIAL_BACKOFF_NOTE;
+
+    public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS = 100;
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MS_DOC = "The (optional) value in milliseconds for the initial wait between login attempts to the external"
+            + " authentication provider."
+            + EXPONENTIAL_BACKOFF_NOTE;
+
+    public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "sasl.oauthbearer.scope.claim.name";
+    public static final String DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "scope";
+    public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC = "The OAuth claim for the scope is often named \"" + DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME + "\", but this (optional)"
+            + " setting can provide a different name to use for the scope included in the JWT payload's claims if the OAuth/OIDC provider uses a different"
+            + " name for that claim.";
+
+    public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sasl.oauthbearer.sub.claim.name";
+    public static final String DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sub";
+    public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC = "The OAuth claim for the subject is often named \"" + DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME + "\", but this (optional)"
+            + " setting can provide a different name to use for the subject included in the JWT payload's claims if the OAuth/OIDC provider uses a different"
+            + " name for that claim.";
+
+    public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI = "sasl.oauthbearer.token.endpoint.uri";
+    public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI_DOC = "The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer's token"
+            + " endpoint URI to which requests will be made to login based on the configuration in " + SASL_JAAS_CONFIG + ". If the URL is file-based, it"
+            + " specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.";
+
+    // These are only specified here outside their normal groupings so that they can be
+    // forward referencing.
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_URI = "sasl.oauthbearer.jwks.endpoint.uri";
+    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS = "sasl.oauthbearer.jwks.endpoint.refresh.ms";

Review comment:
       Should we keep this close to its default and doc?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.jose4j.jwk.HttpsJwks;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.lang.JoseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or
+ * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's
+ * possible to receive a JWT that contains a <code>kid</code> that points to yet-unknown JWK,
+ * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice,
+ * keys are made available for some amount of time before they're used within JWTs.
+ *
+ * This instance is created and provided to the
+ * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using
+ * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then
+ * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of
+ * a JWT.
+ *
+ * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver
+ * @see org.jose4j.keys.resolvers.VerificationKeyResolver
+ * @see ValidatorAccessTokenValidator
+ */
+
+public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class);
+
+    private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16;
+
+    private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 60000;
+
+    private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000;
+
+    private static final int SHUTDOWN_TIMEOUT = 10;
+
+    private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS;
+
+    private final ScheduledExecutorService executorService;
+
+    private final long refreshMs;
+
+    private final ReadWriteLock refreshLock = new ReentrantReadWriteLock();
+
+    private final Map<String, Long> missingKeyIds;
+
+    private List<JsonWebKey> jsonWebKeys;
+
+    private boolean isInited;
+
+    /**
+     *
+     * @param location  HTTP/HTTPS endpoint from which to retrieve the JWKS based on
+     *                  the OAuth/OIDC standard
+     * @param refreshMs The number of milliseconds between refresh passes to connect
+     *                  to the OAuth/OIDC JWKS endpoint to retrieve the latest set
+     */
+
+    public RefreshingHttpsJwks(String location, long refreshMs) {
+        super(location);
+
+        if (refreshMs <= 0)
+            throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive");
+
+        setDefaultCacheDuration(refreshMs);
+
+        this.refreshMs = refreshMs;
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.missingKeyIds = new LinkedHashMap<String, Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
+                return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES;
+            }
+        };
+    }
+
+    @Override
+    public void init() throws IOException {
+        try {
+            log.debug("init started");
+
+            List<JsonWebKey> localJWKs;
+
+            try {
+                localJWKs = super.getJsonWebKeys();
+            } catch (JoseException e) {
+                throw new IOException("Could not refresh JWKS", e);
+            }
+
+            try {
+                refreshLock.writeLock().lock();
+                this.jsonWebKeys = localJWKs;
+            } finally {
+                refreshLock.writeLock().unlock();
+            }
+
+            executorService.scheduleAtFixedRate(this::refreshInternal,
+                0,
+                refreshMs,
+                TimeUnit.MILLISECONDS);
+
+            log.info("JWKS validation key refresh thread started with a refresh interval of {} ms", refreshMs);
+        } finally {
+            isInited = true;
+
+            log.debug("init completed");
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            log.debug("close started");
+
+            try {
+                log.debug("JWKS validation key refresh thread shutting down");
+                executorService.shutdown();
+
+                if (!executorService.awaitTermination(SHUTDOWN_TIMEOUT, SHUTDOWN_TIME_UNIT)) {
+                    log.warn("JWKS validation key refresh thread termination did not end after {} {}",
+                        SHUTDOWN_TIMEOUT, SHUTDOWN_TIME_UNIT);
+                }
+            } catch (InterruptedException e) {
+                log.warn("JWKS validation key refresh thread error during close", e);
+            }
+        } finally {
+            log.debug("close completed");
+        }
+    }
+
+    @Override
+    public List<JsonWebKey> getJsonWebKeys() throws JoseException, IOException {
+        if (!isInited)
+            throw new IllegalStateException("Please call init() first");
+
+        try {
+            refreshLock.readLock().lock();
+            return jsonWebKeys;
+        } finally {
+            refreshLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Internal method that will refresh the cache and if errors are encountered, re-queues
+     * the refresh attempt in a background thread.
+     */
+
+    private void refreshInternal() {
+        try {
+            log.info("JWKS validation key refresh of {} starting", getLocation());
+
+            // Call the *actual* refresh implementation.
+            refresh();
+
+            List<JsonWebKey> jwks = getJsonWebKeys();
+
+            try {
+                refreshLock.writeLock().lock();
+
+                for (JsonWebKey jwk : jwks)
+                    missingKeyIds.remove(jwk.getKeyId());
+            } finally {
+                refreshLock.writeLock().unlock();
+            }
+
+            log.info("JWKS validation key refresh of {} complete", getLocation());
+        } catch (JoseException | IOException e) {
+            // Let's wait a random, but short amount of time before trying again.
+            long waitMs = ThreadLocalRandom.current().nextLong(1000, 10000);
+
+            String message = String.format("JWKS validation key refresh of %s encountered an error; waiting %s ms before trying again",
+                getLocation(),
+                waitMs);
+            log.warn(message, e);
+
+            executorService.schedule(this::refreshInternal, waitMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public boolean maybeScheduleRefreshForMissingKeyId(String keyId) {
+        if (keyId.length() > MISSING_KEY_ID_MAX_KEY_LENGTH) {
+            log.warn("Key ID starting with {} with length {} was too long to cache", keyId.substring(0, 16), keyId.length());
+            return false;
+        } else {
+            try {
+                refreshLock.writeLock().lock();
+
+                // If there's no entry in the missing key ID cache for the incoming key ID,
+                // or it has expired, schedule a refresh.
+                Long lastCheckTime = missingKeyIds.get(keyId);

Review comment:
       It seems that this is the next check time?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>HttpAccessTokenRetriever</code> is an {@link AccessTokenRetriever} that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+    private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+    private static final Set<Integer> UNRETRYABLE_HTTP_CODES;
+
+    public static final String AUTHORIZATION_HEADER = "Authorization";
+
+    static {
+        // This does not have to be an exhaustive list. There are other HTTP codes that
+        // are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585)
+        // that we won't worry about yet. The worst case if a status code is missing from
+        // this set is that the request will be retried.
+        UNRETRYABLE_HTTP_CODES = new HashSet<>();
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+        UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+    }
+
+    private final String clientId;
+
+    private final String clientSecret;
+
+    private final String scope;
+
+    private final SSLSocketFactory sslSocketFactory;
+
+    private final String tokenEndpointUri;
+
+    private final long loginRetryBackoffMs;
+
+    private final long loginRetryBackoffMaxMs;
+
+    private final Integer loginConnectTimeoutMs;
+
+    private final Integer loginReadTimeoutMs;
+
+    public HttpAccessTokenRetriever(String clientId,
+        String clientSecret,
+        String scope,
+        SSLSocketFactory sslSocketFactory,
+        String tokenEndpointUri,
+        long loginRetryBackoffMs,
+        long loginRetryBackoffMaxMs,
+        Integer loginConnectTimeoutMs,
+        Integer loginReadTimeoutMs) {
+        this.clientId = Objects.requireNonNull(clientId);
+        this.clientSecret = Objects.requireNonNull(clientSecret);
+        this.scope = scope;
+        this.sslSocketFactory = sslSocketFactory;
+        this.tokenEndpointUri = Objects.requireNonNull(tokenEndpointUri);
+        this.loginRetryBackoffMs = loginRetryBackoffMs;
+        this.loginRetryBackoffMaxMs = loginRetryBackoffMaxMs;
+        this.loginConnectTimeoutMs = loginConnectTimeoutMs;
+        this.loginReadTimeoutMs = loginReadTimeoutMs;
+    }
+
+    /**
+     * Retrieves a JWT access token in its serialized three-part form. The implementation
+     * is free to determine how it should be retrieved but should not perform validation
+     * on the result.
+     *
+     * <b>Note</b>: This is a blocking function and callers should be aware that the
+     * implementation communicates over a network. The facility in the
+     * {@link javax.security.auth.spi.LoginModule} from which this is ultimately called
+     * does not provide an asynchronous approach.
+     *
+     * @return Non-<code>null</code> JWT access token string
+     *
+     * @throws IOException Thrown on errors related to IO during retrieval
+     */
+
+    @Override
+    public String retrieve() throws IOException {
+        String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
+        String requestBody = formatRequestBody(scope);
+
+        Retry<String> retry = new Retry<>(Time.SYSTEM,
+            loginRetryBackoffMs,
+            loginRetryBackoffMaxMs);
+
+        Map<String, String> headers = Collections.singletonMap(AUTHORIZATION_HEADER, authorizationHeader);
+
+        String responseBody = retry.execute(() -> {
+            HttpURLConnection con = (HttpURLConnection) new URL(tokenEndpointUri).openConnection();
+
+            if (sslSocketFactory != null && con instanceof HttpsURLConnection)
+                ((HttpsURLConnection) con).setSSLSocketFactory(sslSocketFactory);
+
+            try {
+                return post(con, headers, requestBody, loginConnectTimeoutMs, loginReadTimeoutMs);
+            } finally {
+                con.disconnect();
+            }
+        });
+        log.debug("retrieve - responseBody: {}", responseBody);
+
+        return parseAccessToken(responseBody);
+    }
+
+    public static String post(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        handleInput(con, headers, requestBody, connectTimeoutMs, readTimeoutMs);
+        return handleOutput(con);
+    }
+
+    private static void handleInput(HttpURLConnection con,
+        Map<String, String> headers,
+        String requestBody,
+        Integer connectTimeoutMs,
+        Integer readTimeoutMs)
+        throws IOException, UnretryableException {
+        log.debug("handleInput - starting post for {}", con.getURL());
+        con.setRequestMethod("POST");
+        con.setRequestProperty("Accept", "application/json");
+
+        if (headers != null) {
+            for (Map.Entry<String, String> header : headers.entrySet())
+                con.setRequestProperty(header.getKey(), header.getValue());
+        }
+
+        con.setRequestProperty("Cache-Control", "no-cache");
+
+        if (requestBody != null) {
+            con.setRequestProperty("Content-Length", String.valueOf(requestBody.length()));
+            con.setDoOutput(true);
+        }
+
+        con.setUseCaches(false);
+
+        if (connectTimeoutMs != null)
+            con.setConnectTimeout(connectTimeoutMs);
+
+        if (readTimeoutMs != null)
+            con.setReadTimeout(readTimeoutMs);
+
+        log.debug("handleInput - preparing to connect to {}", con.getURL());
+        con.connect();
+
+        if (requestBody != null) {
+            try (OutputStream os = con.getOutputStream()) {
+                ByteArrayInputStream is = new ByteArrayInputStream(requestBody.getBytes(
+                    StandardCharsets.UTF_8));
+                log.debug("handleInput - preparing to write request body to {}", con.getURL());
+                copy(is, os);
+            }
+        }
+    }
+
+    static String handleOutput(final HttpURLConnection con) throws IOException {
+        int responseCode = con.getResponseCode();
+        log.debug("handleOutput - responseCode: {}", responseCode);
+
+        String responseBody = null;
+
+        try (InputStream is = con.getInputStream()) {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            log.debug("handleOutput - preparing to read response body from {}", con.getURL());
+            copy(is, os);
+            responseBody = os.toString(StandardCharsets.UTF_8.name());
+        } catch (Exception e) {
+            log.warn("handleOutput - error retrieving data", e);
+        }
+
+        if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+            if (responseBody == null || responseBody.isEmpty())
+                throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %s from %s", responseCode, con.getURL()));
+
+            log.debug("handleOutput - responseCode: {}, response: {}", responseCode, responseBody);

Review comment:
       Should we log this before throwing the IOException above?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
##########
@@ -75,30 +78,111 @@
     public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC = "Login refresh thread will sleep until the specified window factor relative to the"
             + " credential's lifetime has been reached, at which time it will try to refresh the credential."
             + " Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used"
-            + " if no value is specified. Currently applies only to OAUTHBEARER.";
+            + " if no value is specified."
+            + OAUTHBEARER_NOTE;
     public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80;
 
     public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER = "sasl.login.refresh.window.jitter";
     public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC = "The maximum amount of random jitter relative to the credential's lifetime"
             + " that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;"
-            + " a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.";
+            + " a default value of 0.05 (5%) is used if no value is specified."
+            + OAUTHBEARER_NOTE;
     public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05;
 
     public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS = "sasl.login.refresh.min.period.seconds";
     public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC = "The desired minimum time for the login refresh thread to wait before refreshing a credential,"
             + " in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified.  This value and "
             + " sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
-            + " Currently applies only to OAUTHBEARER.";
+            + OAUTHBEARER_NOTE;
     public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60;
 
     public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS = "sasl.login.refresh.buffer.seconds";
     public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC = "The amount of buffer time before credential expiration to maintain when refreshing a credential,"
             + " in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain"
             + " as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of  300 (5 minutes) is used if no value is specified."
             + " This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
-            + " Currently applies only to OAUTHBEARER.";
+            + OAUTHBEARER_NOTE;
     public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300;
 
+    public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms";
+    public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider connection timeout."
+            + OAUTHBEARER_NOTE;
+
+    public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms";
+    public static final String SASL_LOGIN_READ_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider read timeout."
+            + OAUTHBEARER_NOTE;
+
+    // These are only specified here outside their normal groupings so that they can be
+    // forward referencing.
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms";
+    public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS = "sasl.login.retry.backoff.max.ms";
+
+    private static final String EXPONENTIAL_BACKOFF_NOTE = " Login uses an exponential backoff algorithm with an initial wait based on the"
+            + " " + SASL_LOGIN_RETRY_BACKOFF_MS
+            + " setting and will double in wait length between attempts up to a maximum wait length specified by the"
+            + " " + SASL_LOGIN_RETRY_BACKOFF_MAX_MS
+            + " setting.";

Review comment:
       Should we add OAUTHBEARER_NOTE?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org